Debezium
1. 什么是Debezium
Debezium是基于Apache License 2.0 协议的开源项目,是一个基于数据库日志(PostgreSQL的WAL,Mysql的binlog等)的CDC(change data capture)工具,以非侵入式的方式做数据库之间的数据同步。
1.1 Debizum支持的数据库类型
数据库类型 | 支持的版本 | 备注 |
---|---|---|
PostgreSQL | Database: 9.6, 10, 11, 12 JDBC Driver: 42.2.14 |
|
MySQL | Database: 5.7, 8.0.x JDBC Driver: 8.0.21 |
|
MongoDB | Database: 3.2, 3.4, 3.6, 4.0, 4.2 Driver: 4.1.1 |
|
Oracle | Database: 12c, 19c JDBC Driver: 12.2.0.1, 19.8.0.0, 21.1.0.0 |
|
SQL SERVER | Database: 2017, 2019 JDBC Driver: 7.2.2.jre8 |
|
DB2 | Database: 11.5 JDBC Driver: 11.5.0.0 |
|
Vitess | Database: 8.0.x*, 9.0.x JDBC Driver: 9.0.0 |
孵化中 |
Cassandra | Database: 3.11.4 Driver: 3.11.0 |
孵化中 |
2. Debezium的架构与部署
下图展示了在用Debezium做数据同步的架构图,从图中可以看到除了源数据库和目标数据库外,还包括了Debezium Connector和Kafka。
3. 使用Debezium同步数据到MatrixDB
本节将演示如何使用Debezium从PG12同步数据到MatrixDB。
安装配置文档涉及的软件版本如下:
- CentOS 7.9
- JDK 11
- PostgreSQL 12.7
- matrixdb-4.1.0.community-1.el7.x86_64
- kafka_2.12-2.7.1
- debezium-connector-postgresql-1.6.0
- confluentinc-kafka-connect-jdbc-10.2.1.zip
其他用到的工具
- Postman
- Offset Explorer
3.1 源库和目标库的准备
本例中源库使用的PostgreSQL版本为12.7,需要修改数据库配置:
vim postgresql.conf
listen_addresses = '*' # 如果需要远程访问,则需要配置
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
本例的目标库是MatrxiDB,只要能够通过JDBC接口访问即可,其他不需要做任何特殊的配置。
3.2 Kafka Connect的部署
Kafka Connect是一个用于将数据流输入和输出Kafka的框架,它集成在Kafka的安装包里,在已经部署好Kafka的环境中,它并不需要单独部署。
Connector 是运行在Connect进程或框架上的具体任务,每个Connector 都定义了数据从哪里来,以及复制到哪里去。Connector将单个作业分解为多个task,运行在Kafka Connect的work进程上。
Connector 主要分为Source和Sink两类:
- Source负责导入数据到Kafka,本例中Debezium-connector-postgresql就是属于Source,它负责将捕获PostgreSQL中的数据变更,并写入到Kafka;
- Sink负责从Kafka导出数据,在本例中confluentinc-kafka-connect-jdbc 属于Sink,它把数据从Kafka Topic写到MatrxiDB.
Kafka Connect 启动命令位于目录 /opt/kafka_2.12-2.7.1/bin 下,connect-standalone.sh 为单独运行模式,通常用于开发和测试环境。connect-distributed.sh 为分布式运行模式,在生产环境中推荐使用分布式模式。
3.2.1 Standalone 模式
Standalone模式是最简单的模式,用单一进程负责执行所有connector和task。它的启动命令如下:
export CLASSPATH=/opt/debezium/plugins/*
bin/connect-standalone.sh config/connect-standalone.properties config/dbz-pg12.properties
standalone模式的启动需要两个 properties文件,第一个properties是connect 的属性,第二个properties是connector的属性定义。
a.) connect-standalone.properties
只需定义broker server和plugin.path,plugin.path是connector的解压路径,主要是*.jar 文件。
vim $KAFKA_HOME/config/connect-standalone.properties
bootstrap.servers=dbzsrv:9092 # kafka broker server的服务器名和端口号
plugin.path=/usr/local/share/kafka/plugins # 指定plugins的目录
b.) connector.properties
在standalone模式下,connector的配置也是用 properties文件(properties文件名可自定义),下面的例子是Debezium PostgreSQL Connector的定义:
vim dbz-pg12.properties
name=dbz-pg12-connect
tasks.max=1 # 实时上,目前版本的connector 并不会使用该参数,它总是只启动一个task,官方认为已经够用。
connector.class=io.debezium.connector.postgresql.PostgresConnector # connector 类的名称
database.hostname=192.168.67.10 # 源库 postgres 数据库的IP
database.port=5432 # 源库 postgres 数据库的PORT
database.user=postgres # 源库 postgres 数据库的用户名
database.password=postgres # 源库 postgres 数据库的密码
database.dbname=testdb # 源库 postgres 数据库的密码
database.server.name=dbzsrv # //PostgreSQL server/cluster 的逻辑名称,作为kafka topic名称的组成 {server_name}.{schema_name}.{table_name} ,其中{server_name}就是该参数定义的
plugin.name=pgoutput # 源库 postgres 数据库的逻辑解码器
注意:不同的数据库可用配置参数也不相同,例如MySQL 数据库就提供了参数 include.schema.changes 以记录ddl的变化,但PostgreSQL的逻辑解码器都不支持DDL的变化。
3.2.2 Distributed 模式
分布式模式为Kafka Connect提供了可扩展性和自动容错的能力,与Standalone模式不同,Connector的定义,是通过Rest API 提交的。
下面的例子中,以集群模式启动了两个connect进程,注意 export CLASSPATH 指向的是connector解压后的*.jar的文件路径。
export CLASSPATH=/opt/debezium/plugins/*
./bin/connect-distributed.sh config/conn1-dis-jdbc.properties
./bin/connect-distributed.sh config/conn2-dis-jdbc.properties
a.) 修改 connect-distributed.properties 配置文件
同一个集群的 properties 文件,除了 rest.port 不同,其余配置参数保持一致。
bootstrap.servers=dbzsrv:9092 # kafka broker server的服务器名和端口号
plugin.path=/usr/local/share/kafka/plugins # 指定plugins的目录
rest.port=8083 # 指定 RestAPI的端口号
b.) 定义并创建 connector 配置参数(json)
connect启动 Dsitributed 模式后,就可以用使用 REST API 创建connector。
3.3 Debezium Source Connector
Debezium postgresql source coonnector 负责读取PostgreSQL 源库的WAL文件,捕获数据变更
3.3.1 下载 Debezium connect
有两个下载地址:debezium网站和confluent 网站,虽然文件的名称不同,但压缩包的内容其实是一样的。要选择 for postgresql,不同的数据库,debezium connect 安装包也不同。
a.) debezium 网站:(debezium-debezium-connector-postgresql-1.6.0.zip)
https://debezium.io/releases/1.6/
b.) confluent 网站:(debezium-connector-postgres-1.6.0.Final-plugin.tar.gz)
https://www.confluent.io/hub/debezium/debezium-connector-postgresql
事实上,下载的压缩包的内容都只是kafka connect 的plug-in,里面主要包含7个jar文件。
[root@dbzsrv lib]# ll
total 6508
-rw-r--r--. 1 root root 20743 Jul 8 00:04 debezium-api-1.6.0.Final.jar
-rw-r--r--. 1 root root 329171 Jul 8 00:06 debezium-connector-postgres-1.6.0.Final.jar
-rw-r--r--. 1 root root 866648 Jul 8 00:05 debezium-core-1.6.0.Final.jar
-rw-r--r--. 1 root root 4617 Feb 22 2020 failureaccess-1.0.1.jar
-rw-r--r--. 1 root root 2858426 Jan 21 2021 guava-30.0-jre.jar
-rw-r--r--. 1 root root 932808 Aug 13 2020 postgresql-42.2.14.jar
-rw-r--r--. 1 root root 1634485 Mar 6 2020 protobuf-java-3.8.0.jar
3.3.2 配置 connect properties 文件
主要是修改 properties 文件中的 plugin.path,它要指向 Debezium connect 解压 jar 包所在的路径。如果jar 路径不对,后续在使用Rest API创建Connector 时会报错。
在我的测试中,即便在properties文件中配置了plugin.path,在启动 connect 之前,仍然需要执行 export CLASSPATH=/opt/debezium/plugins/*,这里的CLASSPATH路径也是指向plugin.path的。
3.3.3 distributed模式下创建 connector (json)
Debezium postgresql connector 使用的json文件如下: vim dbz-conn.json
{
"name": "dbz-pg12-connect-dst", // 要注册的connector的名称
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", //连接器类的名称
"tasks.max": "1", //事实上,目前版本的connector 并不会使用该参数,它总是只启动一个task,官方认为已经够用。
"database.hostname": "192.168.67.10", //PostgreSQL 服务器的地址
"database.port": "5432", //PostgreSQL 数据库端口号
"database.dbname": "testdb", //源库 PostgreSQL 数据库的名称
"database.user": "postgres", //PostgreSQL 用户名
"database.password": "postgres", //PostgreSQL 用户名对应的密码
"database.server.name": "dbzsrv", //PostgreSQL server/cluster 的逻辑名称,作为kafka topic名称的组成 {server_name}.{schema_name}.{table_name} ,其中{server_name}就是该参数定义的
"plugin.name": "pgoutput" //PostgreSQL 服务器上使用的 PostgreSQL逻辑解码插件的名称。
}
}
当 Kafka connect 以 distributed 模式启动后,通过 Connect REST API 创建 Connector(POST):
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" http://dbzsrv:8083/connectors -d @/opt/kafka_2.12-2.7.1/config/dbz-conn.json
3.3.4 更新 connector 参数(json)
更新 connector 参数,注意,在update.json中只需要包含"config"中的所有信息。
vim dbz-conn-update.json
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "192.168.67.10",
"database.port": "5432",
"database.dbname": "testdb",
"database.user": "postgres",
"database.password": "postgres",
"database.server.name": "dbzsrv",
"plugin.name": "pgoutput"
}
通过Connect REST API 更新 Connector参数(PUT):
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.67.10:8083/connectors/dis1-connect/config/ -d @dbz-pg12-update.json
3.3.5 常用 REST API命令:
curl http://192.168.67.10:8083/connectors # 列出当前已经创建的connectors名称
curl http://localhost:8083/connectors/dbz-pg12-connect # 查看名为 dbz-pg12-connect 的配置信息
curl -i -X DELETE http://192.168.67.10:8083/connectors/dbz-pg-conn # 删除名为 dbz-pg-conn 的connector
3.3.6 Connector 常用参数说明:
参数名称 | 默认值 | 参数描述 |
---|---|---|
slot.name | debezium | Connector创建和使用的默认复制槽名称,由小写字母、数字和下划线字符组成。 |
publication.name | dbz_publication | Connector创建和使用的默认publication名称 |
schema.include.list | 无 | 一个用逗号分隔的schema列表或正则表达式列表,只捕获schema名称符合该条件的表的数据变更。默认情况下,源库所有schema下表的数据变更事件都会被捕获并投递到Topic. |
schema.exclude.list | 无 | 一个用逗号分隔的schema列表或正则表达式列表,与schema.include.list 的作用相反,符合该条件的schema的表的数据变更都不会被捕获并投递。 |
table.include.list | 无 | 一个用逗号分隔的schema.tablename列表或正则表达式列表,符合该条件的表的数据变更都会被捕获并投递到Topic. |
table.exclude.list | 无 | 作用与table.include.list作用相反。 |
column.include.list | 无 | 一个用逗号分隔的schema.tablename.columnname列表或正则表达式列表,符合该条件的字段的数据变更会被捕获并投递到Kafka topic. |
column.exclude.list | 作用与column.include.list 相反 | |
decimal.handling.mode | precise | 指定connector应如何处理NUMERIC 和 DECIMAL类型 precise:使用java.math.BigDecimal 类型来表示值。 double:使用double类型来表示值,这可能会导致精度损失,但更易于使用。 string:将值编码为格式化字符串,易于使用,但在topic中丢失了有关真实类型的语义信息。 |
message.key.columns | 空 | Connector 默认使用主键作为消息的key,使用该关键字可以选择其他字段作为消息的key,或者当表没有主键时使用该关键字指定key。格式:schemaA.table_a:regex_1;schemaB.table_b |
truncate.handling.mode | skip | 是否捕获并投递truncate事件,skip 表示忽略truncate 事件,include 表示捕获truncate事件。 |
snapshot.mode | initial | initial::connector 仅在表名(逻辑服务名)没有被记录偏移量时才执行快照,比如,connector 第一次连接到源库,或者该表名第一次加入到debezium白名单时 always:connector 在启动时总是执行快照,每次启动connector,都会重新生成一份数据快照。 never:connector从不执行快照。但是,如果 Kafka offsets 主题中存在先前存储的 LSN,则连接器会继续从该位置开始传输数据变更。如果未存储 LSN,则连接器从源库创建 PostgreSQL 逻辑复制槽的时间点开始传输数据变更。 initial_only:connector 仅执行数据库快照创建,完成快照后会自动停止。 incremental:快照和数据变更混合在一起,同时进行。 exported:已经废弃 custom:自定义方式,连接器根据 snapshot.custom.class 类的设置执行快照。 |
signal.data.collection | 无 | 在增量快照(snapshot.mode :incremental)的模式下,指定信号表的名称。 |
skipped.operations | 需要跳过的操作类型的逗号分隔列表。操作包括:c 插入/创建、u 更新和d 删除。默认情况下,不跳过任何操作 |
更多内容,参考如下链接:
https://debezium.io/documentation/reference/1.6/connectors/postgresql.html
3.4 Kafka jdbc connect (sink)
Kafka jdbc connect 其实兼具source 和 sink的功能,它作为source时是侵入式的,在本例中只把它作为sink端,负责把数据从kafka topic 读取后写入到 MatrixDB.
3.4.1 下载kafka jdbc connect
JDBC Connector (Source and Sink)
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
3.4.2 安装配置kafka jdbc connect
解压到指定目录,在它的lib目录下,包含一些*.jar文件,从列表中可以看到它已经自带了postgresql jdbc驱动,使用该驱动可以直接把数据写入到MatrixDB.
unzip confluentinc-kafka-connect-jdbc-10.2.1.zip
[shidb@sdw7 lib]$ ll
total 20736
-rw-r--r-- 1 shidb root 214381 Jul 30 20:00 checker-qual-3.5.0.jar
-rw-r--r-- 1 shidb root 17263 Jul 30 20:00 common-utils-6.0.0.jar
-rw-r--r-- 1 shidb root 317816 Jul 30 20:00 jtds-1.3.1.jar
-rw-r--r-- 1 shidb root 254772 Jul 30 20:41 kafka-connect-jdbc-10.2.1.jar
-rw-r--r-- 1 shidb root 1300915 Jul 30 20:00 mssql-jdbc-8.4.1.jre8.jar
-rw-r--r-- 1 shidb root 4398602 Jul 30 20:00 ojdbc8-19.7.0.0.jar
-rw-r--r-- 1 shidb root 5140 Jul 30 20:00 ojdbc8-production-19.7.0.0.pom
-rw-r--r-- 1 shidb root 156242 Jul 30 20:00 ons-19.7.0.0.jar
-rw-r--r-- 1 shidb root 311000 Jul 30 20:00 oraclepki-19.7.0.0.jar
-rw-r--r-- 1 shidb root 1663954 Jul 30 20:00 orai18n-19.7.0.0.jar
-rw-r--r-- 1 shidb root 210337 Jul 30 20:00 osdt_cert-19.7.0.0.jar
-rw-r--r-- 1 shidb root 312200 Jul 30 20:00 osdt_core-19.7.0.0.jar
-rw-r--r-- 1 shidb root 1005078 Jul 30 20:00 postgresql-42.2.19.jar
-rw-r--r-- 1 shidb root 32168 Jul 30 20:00 simplefan-19.7.0.0.jar
-rw-r--r-- 1 shidb root 41472 Jul 30 20:00 slf4j-api-1.7.30.jar
-rw-r--r-- 1 shidb root 7064881 Jul 30 20:00 sqlite-jdbc-3.25.2.jar
-rw-r--r-- 1 shidb root 1684253 Jul 30 20:00 ucp-19.7.0.0.jar
-rw-r--r-- 1 shidb root 265130 Jul 30 20:00 xdb-19.7.0.0.jar
-rw-r--r-- 1 shidb root 1933746 Jul 30 20:00 xmlparserv2-19.7.0.0.jar
3.4.3 修改 connect properties文件
在pugin.path 添加解压后的lib目录:
plugin.path=/opt/confluentinc-kafka-connect-jdbc-10.2.1/lib
注意,即便在 preperties 文件中添加了 plugin.path,在启动connect 服务前,还是要执行 export,不然还是会遇到找不到class的情况。
export CLASSPATH=/opt/confluentinc-kafka-connect-jdbc-10.2.1/lib/*
在测试时,为了简化操作,有时会把debezium connect 和 jdbc connect 的 *.jar 解压后合并到同一个目录下,这样做可能会引起jar包的版本冲突。
3.4.4 distributed模式下创建 connector (json)
{
"name": "pg1-items-pk-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"connection.url": "jdbc:postgresql://192.168.100.17:5432/tgt_zabbix?stringtype=unspecified",
"connection.user": "dbzusr",
"connection.password": "dbzusr",
"topics": "zabbix.public.items",
"table.name.format":"public.items",
"db.timezone":"Asia/Shanghai",
"auto.create":"false",
"auto.evolve":"false",
"insert.mode": "upsert",
"pk.fields": "itemid",
"pk.mode": "record_key",
"batch.size": "800",
"delete.enabled": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
所有 Kafka connector的方式都是一致的,当 Kafka connect 以 distributed 模式启动后,通过 Connect REST API 创建 Connector(POST):
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" http://192.168.100.17:8083/connectors -d @pg1-items-pk-jdbc-sink.json
成功创建jdbc connector 之后,它就开始消费json配置文件中指定topic的数据,并批量写入到目标库中。
更多内容,请参考:
JDBC Connector (Source and Sink) for Confluent Platform https://docs.confluent.io/kafka-connect-jdbc/current/
JDBC Sink Connector Configuration Properties
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html
3.5 服务启动顺序说明
1.启动源库;
2.启动目标库;
3.启动 zookeeper+kafka.
4.创建 kafka topic.
5.启动 connect distributed.
6.创建 jdbc sink connector.
7.创建 debezium postgresql source connector.