Debezium

1. 什么是Debezium

Debezium是基于Apache License 2.0 协议的开源项目,是一个基于数据库日志(PostgreSQL的WAL,Mysql的binlog等)的CDC(change data capture)工具,以非侵入式的方式做数据库之间的数据同步。

1.1 Debizum支持的数据库类型

数据库类型 支持的版本 备注
PostgreSQL Database: 9.6, 10, 11, 12
JDBC Driver: 42.2.14
MySQL Database: 5.7, 8.0.x
JDBC Driver: 8.0.21
MongoDB Database: 3.2, 3.4, 3.6, 4.0, 4.2
Driver: 4.1.1
Oracle Database: 12c, 19c
JDBC Driver: 12.2.0.1, 19.8.0.0, 21.1.0.0
SQL SERVER Database: 2017, 2019
JDBC Driver: 7.2.2.jre8
DB2 Database: 11.5
JDBC Driver: 11.5.0.0
Vitess Database: 8.0.x*, 9.0.x
JDBC Driver: 9.0.0
孵化中
Cassandra Database: 3.11.4
Driver: 3.11.0
孵化中

2. Debezium的架构与部署

下图展示了在用Debezium做数据同步的架构图,从图中可以看到除了源数据库和目标数据库外,还包括了Debezium Connector和Kafka。 debezium

3. 使用Debezium同步数据到MatrixDB

本节将演示如何使用Debezium从PG12同步数据到MatrixDB。

安装配置文档涉及的软件版本如下:

  • CentOS 7.9
  • JDK 11
  • PostgreSQL 12.7
  • matrixdb-4.1.0.community-1.el7.x86_64
  • kafka_2.12-2.7.1
  • debezium-connector-postgresql-1.6.0
  • confluentinc-kafka-connect-jdbc-10.2.1.zip

其他用到的工具

  • Postman
  • Offset Explorer

3.1 源库和目标库的准备

本例中源库使用的PostgreSQL版本为12.7,需要修改数据库配置:

vim postgresql.conf

listen_addresses = '*' # 如果需要远程访问,则需要配置
wal_level = logical    
max_replication_slots = 10
max_wal_senders = 10

本例的目标库是MatrxiDB,只要能够通过JDBC接口访问即可,其他不需要做任何特殊的配置。

3.2 Kafka Connect的部署

Kafka Connect是一个用于将数据流输入和输出Kafka的框架,它集成在Kafka的安装包里,在已经部署好Kafka的环境中,它并不需要单独部署。

Connector 是运行在Connect进程或框架上的具体任务,每个Connector 都定义了数据从哪里来,以及复制到哪里去。Connector将单个作业分解为多个task,运行在Kafka Connect的work进程上。

Connector 主要分为Source和Sink两类:

  • Source负责导入数据到Kafka,本例中Debezium-connector-postgresql就是属于Source,它负责将捕获PostgreSQL中的数据变更,并写入到Kafka;
  • Sink负责从Kafka导出数据,在本例中confluentinc-kafka-connect-jdbc 属于Sink,它把数据从Kafka Topic写到MatrxiDB.

Kafka Connect 启动命令位于目录 /opt/kafka_2.12-2.7.1/bin 下,connect-standalone.sh 为单独运行模式,通常用于开发和测试环境。connect-distributed.sh 为分布式运行模式,在生产环境中推荐使用分布式模式。

3.2.1 Standalone 模式

Standalone模式是最简单的模式,用单一进程负责执行所有connector和task。它的启动命令如下:

export CLASSPATH=/opt/debezium/plugins/*
bin/connect-standalone.sh config/connect-standalone.properties config/dbz-pg12.properties

standalone模式的启动需要两个 properties文件,第一个properties是connect 的属性,第二个properties是connector的属性定义。

a.) connect-standalone.properties

只需定义broker server和plugin.path,plugin.path是connector的解压路径,主要是*.jar 文件。

vim $KAFKA_HOME/config/connect-standalone.properties

bootstrap.servers=dbzsrv:9092   # kafka broker server的服务器名和端口号
plugin.path=/usr/local/share/kafka/plugins  # 指定plugins的目录

b.) connector.properties

​ 在standalone模式下,connector的配置也是用 properties文件(properties文件名可自定义),下面的例子是Debezium PostgreSQL Connector的定义:

vim dbz-pg12.properties

name=dbz-pg12-connect
tasks.max=1     # 实时上,目前版本的connector 并不会使用该参数,它总是只启动一个task,官方认为已经够用。
connector.class=io.debezium.connector.postgresql.PostgresConnector  # connector 类的名称
database.hostname=192.168.67.10  # 源库 postgres 数据库的IP
database.port=5432              # 源库 postgres 数据库的PORT
database.user=postgres           # 源库 postgres 数据库的用户名
database.password=postgres       # 源库 postgres 数据库的密码 
database.dbname=testdb           # 源库 postgres 数据库的密码
database.server.name=dbzsrv       #  //PostgreSQL server/cluster 的逻辑名称,作为kafka topic名称的组成 {server_name}.{schema_name}.{table_name} ,其中{server_name}就是该参数定义的
plugin.name=pgoutput            # 源库 postgres 数据库的逻辑解码器

注意:不同的数据库可用配置参数也不相同,例如MySQL 数据库就提供了参数 include.schema.changes 以记录ddl的变化,但PostgreSQL的逻辑解码器都不支持DDL的变化。

3.2.2 Distributed 模式

分布式模式为Kafka Connect提供了可扩展性和自动容错的能力,与Standalone模式不同,Connector的定义,是通过Rest API 提交的。

下面的例子中,以集群模式启动了两个connect进程,注意 export CLASSPATH 指向的是connector解压后的*.jar的文件路径。

export CLASSPATH=/opt/debezium/plugins/*
./bin/connect-distributed.sh config/conn1-dis-jdbc.properties
./bin/connect-distributed.sh config/conn2-dis-jdbc.properties

a.) 修改 connect-distributed.properties 配置文件

同一个集群的 properties 文件,除了 rest.port 不同,其余配置参数保持一致。

bootstrap.servers=dbzsrv:9092   # kafka broker server的服务器名和端口号
plugin.path=/usr/local/share/kafka/plugins  # 指定plugins的目录
rest.port=8083  # 指定 RestAPI的端口号

b.) 定义并创建 connector 配置参数(json)

connect启动 Dsitributed 模式后,就可以用使用 REST API 创建connector。

3.3 Debezium Source Connector

Debezium postgresql source coonnector 负责读取PostgreSQL 源库的WAL文件,捕获数据变更

3.3.1 下载 Debezium connect

有两个下载地址:debezium网站和confluent 网站,虽然文件的名称不同,但压缩包的内容其实是一样的。要选择 for postgresql,不同的数据库,debezium connect 安装包也不同。

a.) debezium 网站:(debezium-debezium-connector-postgresql-1.6.0.zip)

https://debezium.io/releases/1.6/

b.) confluent 网站:(debezium-connector-postgres-1.6.0.Final-plugin.tar.gz)

https://www.confluent.io/hub/debezium/debezium-connector-postgresql

事实上,下载的压缩包的内容都只是kafka connect 的plug-in,里面主要包含7个jar文件。

[root@dbzsrv lib]# ll
total 6508
-rw-r--r--. 1 root root   20743 Jul  8 00:04 debezium-api-1.6.0.Final.jar
-rw-r--r--. 1 root root  329171 Jul  8 00:06 debezium-connector-postgres-1.6.0.Final.jar
-rw-r--r--. 1 root root  866648 Jul  8 00:05 debezium-core-1.6.0.Final.jar
-rw-r--r--. 1 root root    4617 Feb 22  2020 failureaccess-1.0.1.jar
-rw-r--r--. 1 root root 2858426 Jan 21  2021 guava-30.0-jre.jar
-rw-r--r--. 1 root root  932808 Aug 13  2020 postgresql-42.2.14.jar
-rw-r--r--. 1 root root 1634485 Mar  6  2020 protobuf-java-3.8.0.jar

3.3.2 配置 connect properties 文件

主要是修改 properties 文件中的 plugin.path,它要指向 Debezium connect 解压 jar 包所在的路径。如果jar 路径不对,后续在使用Rest API创建Connector 时会报错。

在我的测试中,即便在properties文件中配置了plugin.path,在启动 connect 之前,仍然需要执行 export CLASSPATH=/opt/debezium/plugins/*,这里的CLASSPATH路径也是指向plugin.path的。

3.3.3 distributed模式下创建 connector (json)

Debezium postgresql connector 使用的json文件如下: vim dbz-conn.json

{
    "name": "dbz-pg12-connect-dst",  // 要注册的connector的名称
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",   //连接器类的名称
        "tasks.max": "1",       //事实上,目前版本的connector 并不会使用该参数,它总是只启动一个task,官方认为已经够用。
        "database.hostname": "192.168.67.10",  //PostgreSQL 服务器的地址
        "database.port": "5432",              //PostgreSQL 数据库端口号
        "database.dbname": "testdb",           //源库 PostgreSQL 数据库的名称
        "database.user": "postgres",           //PostgreSQL 用户名
        "database.password": "postgres",       //PostgreSQL 用户名对应的密码
        "database.server.name": "dbzsrv",      //PostgreSQL server/cluster 的逻辑名称,作为kafka topic名称的组成 {server_name}.{schema_name}.{table_name} ,其中{server_name}就是该参数定义的
        "plugin.name": "pgoutput"              //PostgreSQL 服务器上使用的 PostgreSQL逻辑解码插件的名称。
    }
}

当 Kafka connect 以 distributed 模式启动后,通过 Connect REST API 创建 Connector(POST):

curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" http://dbzsrv:8083/connectors -d @/opt/kafka_2.12-2.7.1/config/dbz-conn.json

3.3.4 更新 connector 参数(json)

更新 connector 参数,注意,在update.json中只需要包含"config"中的所有信息。

vim dbz-conn-update.json

{
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "192.168.67.10",
        "database.port": "5432",
        "database.dbname": "testdb",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.server.name": "dbzsrv",
        "plugin.name": "pgoutput"
}

通过Connect REST API 更新 Connector参数(PUT):

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.67.10:8083/connectors/dis1-connect/config/ -d @dbz-pg12-update.json

3.3.5 常用 REST API命令:

curl http://192.168.67.10:8083/connectors                # 列出当前已经创建的connectors名称
curl http://localhost:8083/connectors/dbz-pg12-connect   # 查看名为 dbz-pg12-connect 的配置信息
curl -i -X DELETE http://192.168.67.10:8083/connectors/dbz-pg-conn  # 删除名为 dbz-pg-conn 的connector

3.3.6 Connector 常用参数说明:

参数名称 默认值 参数描述
slot.name debezium Connector创建和使用的默认复制槽名称,由小写字母、数字和下划线字符组成。
publication.name dbz_publication Connector创建和使用的默认publication名称
schema.include.list 一个用逗号分隔的schema列表或正则表达式列表,只捕获schema名称符合该条件的表的数据变更。默认情况下,源库所有schema下表的数据变更事件都会被捕获并投递到Topic.
schema.exclude.list 一个用逗号分隔的schema列表或正则表达式列表,与schema.include.list 的作用相反,符合该条件的schema的表的数据变更都不会被捕获并投递。
table.include.list 一个用逗号分隔的schema.tablename列表或正则表达式列表,符合该条件的表的数据变更都会被捕获并投递到Topic.
table.exclude.list 作用与table.include.list作用相反。
column.include.list 一个用逗号分隔的schema.tablename.columnname列表或正则表达式列表,符合该条件的字段的数据变更会被捕获并投递到Kafka topic.
column.exclude.list 作用与column.include.list 相反
decimal.handling.mode precise 指定connector应如何处理NUMERIC 和 DECIMAL类型
precise:使用java.math.BigDecimal 类型来表示值。
double:使用double类型来表示值,这可能会导致精度损失,但更易于使用。
string:将值编码为格式化字符串,易于使用,但在topic中丢失了有关真实类型的语义信息。
message.key.columns Connector 默认使用主键作为消息的key,使用该关键字可以选择其他字段作为消息的key,或者当表没有主键时使用该关键字指定key。格式:schemaA.table_a:regex_1;schemaB.table_b
truncate.handling.mode skip 是否捕获并投递truncate事件,skip 表示忽略truncate 事件,include 表示捕获truncate事件。
snapshot.mode initial initial::connector 仅在表名(逻辑服务名)没有被记录偏移量时才执行快照,比如,connector 第一次连接到源库,或者该表名第一次加入到debezium白名单时
always:connector 在启动时总是执行快照,每次启动connector,都会重新生成一份数据快照。
never:connector从不执行快照。但是,如果 Kafka offsets 主题中存在先前存储的 LSN,则连接器会继续从该位置开始传输数据变更。如果未存储 LSN,则连接器从源库创建 PostgreSQL 逻辑复制槽的时间点开始传输数据变更。
initial_only:connector 仅执行数据库快照创建,完成快照后会自动停止。
incremental:快照和数据变更混合在一起,同时进行。
exported:已经废弃
custom:自定义方式,连接器根据snapshot.custom.class类的设置执行快照。
signal.data.collection 在增量快照(snapshot.mode :incremental)的模式下,指定信号表的名称。
skipped.operations 需要跳过的操作类型的逗号分隔列表。操作包括:c插入/创建、u更新和d删除。默认情况下,不跳过任何操作

更多内容,参考如下链接:

https://debezium.io/documentation/reference/1.6/connectors/postgresql.html

3.4 Kafka jdbc connect (sink)

Kafka jdbc connect 其实兼具source 和 sink的功能,它作为source时是侵入式的,在本例中只把它作为sink端,负责把数据从kafka topic 读取后写入到 MatrixDB.

3.4.1 下载kafka jdbc connect

JDBC Connector (Source and Sink)

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

3.4.2 安装配置kafka jdbc connect

解压到指定目录,在它的lib目录下,包含一些*.jar文件,从列表中可以看到它已经自带了postgresql jdbc驱动,使用该驱动可以直接把数据写入到MatrixDB.

unzip confluentinc-kafka-connect-jdbc-10.2.1.zip

[shidb@sdw7 lib]$ ll
total 20736
-rw-r--r-- 1 shidb root  214381 Jul 30 20:00 checker-qual-3.5.0.jar
-rw-r--r-- 1 shidb root   17263 Jul 30 20:00 common-utils-6.0.0.jar
-rw-r--r-- 1 shidb root  317816 Jul 30 20:00 jtds-1.3.1.jar
-rw-r--r-- 1 shidb root  254772 Jul 30 20:41 kafka-connect-jdbc-10.2.1.jar
-rw-r--r-- 1 shidb root 1300915 Jul 30 20:00 mssql-jdbc-8.4.1.jre8.jar
-rw-r--r-- 1 shidb root 4398602 Jul 30 20:00 ojdbc8-19.7.0.0.jar
-rw-r--r-- 1 shidb root    5140 Jul 30 20:00 ojdbc8-production-19.7.0.0.pom
-rw-r--r-- 1 shidb root  156242 Jul 30 20:00 ons-19.7.0.0.jar
-rw-r--r-- 1 shidb root  311000 Jul 30 20:00 oraclepki-19.7.0.0.jar
-rw-r--r-- 1 shidb root 1663954 Jul 30 20:00 orai18n-19.7.0.0.jar
-rw-r--r-- 1 shidb root  210337 Jul 30 20:00 osdt_cert-19.7.0.0.jar
-rw-r--r-- 1 shidb root  312200 Jul 30 20:00 osdt_core-19.7.0.0.jar
-rw-r--r-- 1 shidb root 1005078 Jul 30 20:00 postgresql-42.2.19.jar
-rw-r--r-- 1 shidb root   32168 Jul 30 20:00 simplefan-19.7.0.0.jar
-rw-r--r-- 1 shidb root   41472 Jul 30 20:00 slf4j-api-1.7.30.jar
-rw-r--r-- 1 shidb root 7064881 Jul 30 20:00 sqlite-jdbc-3.25.2.jar
-rw-r--r-- 1 shidb root 1684253 Jul 30 20:00 ucp-19.7.0.0.jar
-rw-r--r-- 1 shidb root  265130 Jul 30 20:00 xdb-19.7.0.0.jar
-rw-r--r-- 1 shidb root 1933746 Jul 30 20:00 xmlparserv2-19.7.0.0.jar

3.4.3 修改 connect properties文件

在pugin.path 添加解压后的lib目录:

plugin.path=/opt/confluentinc-kafka-connect-jdbc-10.2.1/lib

注意,即便在 preperties 文件中添加了 plugin.path,在启动connect 服务前,还是要执行 export,不然还是会遇到找不到class的情况。

export CLASSPATH=/opt/confluentinc-kafka-connect-jdbc-10.2.1/lib/*

在测试时,为了简化操作,有时会把debezium connect 和 jdbc connect 的 *.jar 解压后合并到同一个目录下,这样做可能会引起jar包的版本冲突。

3.4.4 distributed模式下创建 connector (json)

{
    "name": "pg1-items-pk-jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "3",
        "connection.url": "jdbc:postgresql://192.168.100.17:5432/tgt_zabbix?stringtype=unspecified",
        "connection.user": "dbzusr",
        "connection.password": "dbzusr",
        "topics": "zabbix.public.items",
        "table.name.format":"public.items",
        "db.timezone":"Asia/Shanghai",
        "auto.create":"false",
        "auto.evolve":"false",
        "insert.mode": "upsert",
        "pk.fields": "itemid",
        "pk.mode": "record_key",
        "batch.size": "800",
        "delete.enabled": "true",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
        }
}

所有 Kafka connector的方式都是一致的,当 Kafka connect 以 distributed 模式启动后,通过 Connect REST API 创建 Connector(POST):

curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" http://192.168.100.17:8083/connectors -d @pg1-items-pk-jdbc-sink.json

成功创建jdbc connector 之后,它就开始消费json配置文件中指定topic的数据,并批量写入到目标库中。

更多内容,请参考:

JDBC Connector (Source and Sink) for Confluent Platform https://docs.confluent.io/kafka-connect-jdbc/current/

JDBC Sink Connector Configuration Properties

https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html

3.5 服务启动顺序说明

1.启动源库;

2.启动目标库;

3.启动 zookeeper+kafka.

4.创建 kafka topic.

5.启动 connect distributed.

6.创建 jdbc sink connector.

7.创建 debezium postgresql source connector.