sync2any同步到clickhouse源码解析

1、初始化

配置相关的类全部放到了com.jte.sync2any.con包下。

1.1 配置初始化

放到 application.yml中的配置需要加载到应用中,我们通过Sync2any类来映射yml文件中sync2any部分的配置。

1
2
3
4
5
6
@Component
@ConfigurationProperties("sync2any")
@Data
public class Sync2any {
...
}

有了具体的配置之后,还需要对配置进行解析。这个部分在RuleConfigParser.initAllRules()实现。比如源字段和目标字段的映射关系,还有源表名和目标表名的映射关系等等。同时这里还会对yml的配置文件进行校验,对于一些必填项如果没填,则会直接报错。

1.2 数据库初始化

数据库分为两类:

  • 源数据库:支持tdsql
  • 目标数据库:支持tdsqlesck

源数据库的初始化在SourceMysqlDatasourceConfig,目标数据在TargetDatasourcesConfig。他们分别会被保存在一个叫“allSourceTemplate”,“allTargetDatasource”的spring bean中。方便需要的时候随时获取。

1.3 kafka初始化

kafka的初始化入口在KafkaConfig.initKafka(),该方法是会被spring实例化后自动调用。

1
2
3
4
5
6
7
8
9
10
11
@PostConstruct
public void initKafka() {
if (StringUtils.isBlank(kafkaMate.getAddress())) {
log.error("请填写kafka的相关配置。");
System.exit(500);
}
sync2any.getSyncConfigList().forEach(sdb -> {
KafkaMessageListenerContainer<String, byte[]> container = createContainer(sdb.getMq(),sdb.getSourceDbId());
KAFKA_SET.add(container);
});
}

从源码中可以看到,初始化好的所有kafka实例都被放到了KAFKA_SET中。注意此时只是初始化了kafka,但是并未开启监听。因为对于需要载入原始数据的数据库来说,需要将原始数据全部载入完毕之后才能开启队列的监听。不然同步数据和增量数据同时载入数据库就可能发生数据错误。

2.载入原始数据

原始数据是指在源数据库中的存量数据。在程序业务的入口StartListener.startRiver()中会判断dumpOriginData的配置。如果确定需要载入原始数据,就会走以下流程。

  1. 使用mysqldump将数据转化成sql文件。
  2. 使用druid将每一行sql转化为CudRequest 对象。
  3. 调用目标数据库的批量新增方法。

3.载入增量数据

这里会考虑到一种数据源被同步到多种不同类型的数据源的情况。比如tdsql同步到es、clickhouse。因此我们必须拥有良好的扩展性,能够让我们使用少量的代码即可增加一个新类型的数据源。因此在这个基础上,我们设计了以下数据流。

  1. 接受到kafka中的同步信息解析成为TableRecords对象。
  2. 根据TableRecords,转化为CudRequest对象。
  3. 最后使用不同的AbstractLoadService实现类将CudRequest对象持久化。

在上面这个过程中,我们会把所有增量数据都转化为同一个类型的数据——CudRequest对象。因此持久化实现类无需关注增量数据的是怎么来的。只需要按照同样的规则到CudRequest里面去取相应的数据就可以了。