MatrixDB 4.1.0新增kafka无缝连接功能,可将kafka数据持续自动导入到MatrixDB表中,并支持图形化操作。

本节将以一个最简单的例子,接入单字符串行,来介绍如何用MatrixDB管理平台来接入Kafka数据。

1. 准备工作

1.1 创建Kafka Topic

假设我们在本地9092端口启动了Kafka服务,通过下面命令创建一个测试Topic:

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092

然后写入几个测试数据:

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
>test1
>test2
>test3
>^C

通过上面命令向新创建的test-topic写入了3条数据,分别为:test1,test2,test3。

1.2 创建接入目标表

创建接入数据用的测试表,方便起见,仅创建包含一个字符串字段的表: 连接数据库mxadmin

[mxadmin@mdw ~]$ psql mxadmin
psql (12)
Type "help" for help.

mxadmin=#

创建测试表

CREATE TABLE dest_test (c1 TEXT);

2. 创建接入过程

在浏览器里输入MatrixGate所在机器的ip(默认是mdw的ip)、端口和datastream后缀:

http://<IP>:8240/datastream

看到登录页面,输入密码,点登录: 登录管理平台

随后,进入管理平台主界面。点击右上角创建数据流: 管理平台主界面

默认会选择Apache Kafka,点下一步即可: 创建数据流

进入第一步,连接Kafka。在输入框里输入Kafka服务的地址。点下一步: 连接Kafka

第二步,选择主题和格式。在主题名中选择刚刚创建的test-topic,格式和分隔符默认即可,然后会在下面看到刚刚写入的几条测试数据。点下一步: 选择主题和格式

第三步,选择目标表。选择做数据接入的目标表,包括:数据库、模式和表。点下一步: 选择目标表

第四步,审核。确认刚才的选择,点提交: 审核

最后,会看到数据流列表里包含了刚刚创建的数据流的相关信息和状态: 状态页面

状态页面包含了非常多的信息,鼠标悬停在不同位置会显示出不同的信息,包括详细的统计信息、出错时间等。示意图如下: 状态页面

3. 查看接入数据

按照上面过程创建好了接入过程后,就完成了数据接入过程,之前写入到Topic的数据和后面新接入的数据都会流向dest_test表中,可以通过查询看到:

mxadmin=# select * from dest_test;
  c1
-------
 test1
 test2
 test3
(3 rows)