启动流程
上一篇讲了debezium整个架构流程,这篇文件讲一下具体启动的细节。
先上一张流程图:
从MysqlConnector开始启动,再调MySqlConnectorTask的start方法,MySqlConnectorTask启动的时候,会先判断是否有从之前记录的offset启动,来决定是否需要开启snapshot阶段
- 如果从offset启动:
- 这时会先判断是否存在dbHistory,如果不存在话,会判断是否是恢复模式(schema_only_recovery),不是的话直接报错,是的话设置startWithSnapshot=true并且也会校验offset是否可用,不可用抛异常。如果dbHistory存在的话,就会从dbHistory恢复dbSchema信息到offset位置,之后在判断指定的binlog是否可用,可用的话设置startWithSnapshot=false,否则true
1
2// Before anything else, recover the database history to the specified binlog coordinates ...
taskContext.loadHistory(source);
- 这时会先判断是否存在dbHistory,如果不存在话,会判断是否是恢复模式(schema_only_recovery),不是的话直接报错,是的话设置startWithSnapshot=true并且也会校验offset是否可用,不可用抛异常。如果dbHistory存在的话,就会从dbHistory恢复dbSchema信息到offset位置,之后在判断指定的binlog是否可用,可用的话设置startWithSnapshot=false,否则true
这里怎么恢复到指定的offset呢?我们看 dbHistory.recover 方法,其实就是从头开始读history,在比较每个HistoryRecord是否 <= stopPoint。这里的比较方法,通过比较器HistoryRecordComparator来比较 :
1 | /** |
先比较source是否一致,在比较position是否<= ,注意这里比较position实际上是调用的重写了的 isPositionAtOrBefore 方法,如下:
1 | // Set up a history record comparator that uses the GTID filter ... |
SourceInfo.isPositionAtOrBefore 里面,其实是先判断是否有gtids属性,如果offset里没有,recorded里有,则会返回false不会加载这一条HistoryRecord,如果都存在,则会比较两个gtid(具体比较方法,后面分析),如果都没有gtid则会,比较二进制position。
- 如果offset不存在
先判断SnapshotMode是否是 never模式,是的话设置setBinlogStartPoint到最早(source.setBinlogStartPoint(“”, 0L);),否则 设置startWithSnapshot=true。
如果最后得到startWithSnapshot=true, 在判断启动模式判断是否initial_only,是 只启动snapshotReader,否则启动snapshotReader和binlogReader。
startWithSnapshot=false, 判断是否有新表配置,是的话 会启动parallelSnapshotReader,reconcilingBinlogReader,unifiedBinlogReader。否则,只启动BinlogReader