什么是cdc
CDC 的全称是 Change Data Capture ,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
- 数据同步,用于备份,容灾
- 数据分发,一个数据源分发给多个下游
- 数据采集(E),面向数据仓库/数据湖的 ETL 数据集成
常用的cdc技术实现
目前业界开源的实现技术主要有,canal , debezium , maxwell 这三种 ,具体比较这里不详细解读了(参考:https://zhuanlan.zhihu.com/p/75357900 )
Flink-cdc-connector源码分析
flink-cdc-connector 使用了debezium-embeded-engine来实现从mysql获取binlog日志事件。
1 | debezium本来是作为一个kafka connect plugin service 来实现的,后面 debezium社区把它抽象出来了,可以独立于kafka connect 来使用 |
我们先来看一下debezium嵌入式使用demo 代码如下:
1 | public static void main(String[] args) { |
使用 DebeziumEngine.create 方法构建 DebeziumEngine 对象。 用标准的**Properties**对象传入配置属性,其中重要的属性如下:
- connector.class: 连接器类,如MysqlConnector
- offset.storage:binlog position存储实现类
需要把binlog position保存起来,以便程序挂掉后重启还可以接着从保存的位点开始消费
- database.history:主库历史schemea存储实现类
这里需要记录mysql库表的schema信息,以便恢复数据和扩充信息(因为binlog里并不包含字段信息)
- database.* :mysql master 相关信息
最后使用ExecutorService来启动,当接受到binlog的insert,update,delete事件后 通过 notifying 回调通知,业务处理代码主要在这个地方完成。
flink-cdc-connector中使用
我们下来看一下在flink datastream 中用法,代码如下:
1 | public static void main(String[] args) throws Exception { |
通过MySQLSource.build 来构建一个flink SourceFunction * DebeziumSourceFunction* , 我们来看一下实现:
这个类的继承结构如下:
1 | public class DebeziumSourceFunction<T> extends RichSourceFunction<T> |
该算子实现了 CheckpointedFunction,CheckpointListener来做状态存储(主要是存储offsets,databasehistory)。
在看一下 run 方法,通过 DebeziumEngine.build来创建DebeziumEngine,并启动
1 | public void run(SourceContext<T> sourceContext) throws Exception { |
其中,offset.storage 使用了*FlinkOffsetBackingStore*** 类来实现 position的持久化。database.history 使用了*FlinkDatabaseHistory***类来存储历史schema信息。
我们先来看一下 FlinkOffsetBackingStore 这个类的主要作用是存储和获取offset,这里并不直接和flink的state storeage打交道,看一下类注释
1 | A implementation of OffsetBackingStore backed on Flink's state mechanism. |
注意其中的 OFFSET_STATE_VALUE 变量,这个在任务失败恢复的时候会通过 DebeziumEngine 配置来初始化值,这个值从flink state 里设置。
而真正做offset状态持久化存储的是在 DebeziumSourceFunction类的 snapshotState 方法里 ,这里会继续调用 snapshotOffsetState 方法,看一下代码,
1 | private void snapshotOffsetState(long checkpointId) throws Exception { |
里面用到里 offsetState 这个 ListState<byte[]> 来存储offset , 它的值通过 consumer.snapshotCurrentState(); 这个方法来获取 ,看一下方法实现
1 | public byte[] snapshotCurrentState() throws Exception { |
offset的值就在 debeziumOffset 变量里,这个变量的值回在DebeziumChangeConsumer.handleBatch 里进行更新 ,当每来一个crud 事件的时候,就回调用这个方法,这个方法最终回调用 emitRecordsUnderCheckpointLock 这个进行 event向下游发送并更新debeziumOffset 变量的值。
注意:
这里会使用同步checkpointLock(这个锁的作用见后面分析),emit the records, using the checkpoint lock to guarantee atomicity of record emission and offset state update
。
做完以上步骤后,offsetState就会通过checkpoint机制持久化了。最后一步是通过 notifyCheckpointComplete() 回调方法,来把已经checkpoint的offsetState值,写回 debezium FlinkOffsetBackingStore 存储里,实际调用:
1 | DebeziumOffset offset = |
最终回调用:FlinkOffsetBackingStore.set 方法。到此 offset 状态持久化存储流程就走完了。
再来看一下状态恢复流程,调用 DebeziumSourceFunction.initializeState 方法,context.isRestored() 返回true , 通过 offsetState 获取到状态值,赋值给 restoredOffsetState 变量,最后在 run 方法里会把这个变量值设置给
properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState); debezium 启动时,会初始化 FlinkOffsetBackingStore 调用它的 configure 方法,完成offset 初始化。
1 | @Override |
以上完成了,flink-cdc-connector 启动,offset存储和恢复源码分析,对于 FlinkDatabaseHistory 大致原理差不多,读者可以自行阅读源码。
checkpointLock 锁
在1.x版本里 如果在snapshot阶段的时候,是不能进行checkpoint的。因为,snapshot阶段并没有支持断点(增量)读取数据。
代码实现如下:
1 | if (isInDbSnapshotPhase) { |
最后总结一下flink-cdc-connector的一些特性,参考github文档里总结
Exactly-Once Processing (可以实现精确一致的处理)
The MySQL CDC connector is a Flink Source connector which will read database snapshot first and then continues to read binlogs with exactly-once processing even failures happen. Please read How the connector performs database snapshot.
Startup Reading Position (可以从指定位点消费binlog)
The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer.
Single Thread Reading (只能是一个并行度处理)
The MySQL CDC source can’t work in parallel reading, because there is only one task can receive binlog events.
Supports reading database snapshot and continues to read binlogs(支持做初始化快照,但初始化快照需要对数据库加锁)
目前社区正在进行flink cdc 2.0的开发,来解决单并行度和数据库加锁的问题
参考:https://github.com/ververica/flink-cdc-connectors/pull/233