MatrixDB 4.1.0新增kafka无缝连接功能,可将kafka数据持续自动导入到MatrixDB表中,并支持图形化操作。
本节将以一个最简单的例子,接入单字符串行,来介绍如何用MatrixDB管理平台来接入Kafka数据。
1. 准备工作
1.1 创建Kafka Topic
假设我们在本地9092端口启动了Kafka服务,通过下面命令创建一个测试Topic:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
然后写入几个测试数据:
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
>test1
>test2
>test3
>^C
通过上面命令向新创建的test-topic写入了3条数据,分别为:test1,test2,test3。
1.2 创建接入目标表
创建接入数据用的测试表,方便起见,仅创建包含一个字符串字段的表: 连接数据库mxadmin
[mxadmin@mdw ~]$ psql mxadmin
psql (12)
Type "help" for help.
mxadmin=#
创建测试表
CREATE TABLE dest_test (c1 TEXT);
2. 创建接入过程
在浏览器里输入MatrixGate所在机器的ip(默认是mdw的ip)、端口和datastream后缀:
http://<IP>:8240/datastream
看到登录页面,输入密码,点登录:
随后,进入管理平台主界面。点击右上角创建数据流:
默认会选择Apache Kafka,点下一步即可:
进入第一步,连接Kafka。在输入框里输入Kafka服务的地址。点下一步:
第二步,选择主题和格式。在主题名中选择刚刚创建的test-topic,格式和分隔符默认即可,然后会在下面看到刚刚写入的几条测试数据。点下一步:
第三步,选择目标表。选择做数据接入的目标表,包括:数据库、模式和表。点下一步:
第四步,审核。确认刚才的选择,点提交:
最后,会看到数据流列表里包含了刚刚创建的数据流的相关信息和状态:
3. 查看接入数据
按照上面过程创建好了接入过程后,就完成了数据接入过程,之前写入到Topic的数据和后面新接入的数据都会流向dest_test表中,可以通过查询看到:
mxadmin=# select * from dest_test;
c1
-------
test1
test2
test3
(3 rows)