Kafka接入

Apache Kafka是一个开源分布式事件流平台,它可以被当作是一个消息系统,读写流式数据,帮助发布、订阅消息;也可以用于编写可扩展的流处理应用程序,来应付实时响应的场景;还可以与数据库对接,安全的将流式数据存储在一个分布式,有副本备份,可容错的集群中。可以说,它所生产的“事件流”是信息数据传输的“中枢神经”。

如果你正打算用Kafka将数据写入MatrixDB集群,请不要错过这篇文档。MatrixDB数据库支持Kafka无缝连接功能,可将Kafka数据持续自动导入到MatrixDB表中,并支持图形化界面操作。 目前接入的数据格式可以是CSV和JSON。我们将以一个最简单的例子,来介绍如何用MatrixDB管理平台接入Kafka数据。

1. 准备工作

1.1 搭建Kafka环境

首先你需要搭建一个健康的Kafka环境,参照官方指南 Kafka Quickstart 即可。

1.2 创建Kafka主题

现在,你已经在服务器上搭建好了Kafka环境,你可以通过以下命令或其他方式进入。

$ cd packages/kafka_2.13-3.2.0

然后启动Kafka服务并创建一个测试主题(Topic),端口号默认为9092。

$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.3 写入测试数据

然后分条写入几个测试数据,以ctrl-c结束:

$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C

通过上面命令向新创建的csv_test写入了4条数据,分别为4条逗号分割的CSV行。 准备工作完毕,下面将介绍如何创建Kafka数据流来导入数据。

2. 创建Kafka数据流

在浏览器里输入MatrixGate所在机器的IP(默认是主节点的IP)、端口号:

http://<IP>:8240

成功登录之后进入主界面,选择Kafka数据源。首先以CSV文件为例说明步骤。 Kafka

2.1 CSV数据流接入

2.1.1 连接

在数据源里输入Kafka broker地址,显示连接成功之后,点击“下一步”按钮。如果有多个URL,需要用逗号分割。 Kafka

2.1.2 选择

主题选择刚才创建的csv_test,选择完之后会看到主题中的数据列表。 Kafka

选中一条数据作为解析与映射的样例数据,再选择CSV数据格式,此时会自动根据该样例数据解析出数据使用的分隔符,点击“下一步”按钮。 Kafka

2.1.3 解析

根据上一步选中的样例数据以及数据格式进行解析。字段列显示CSV数据通过分隔符分隔之后每个字段的索引号,值列显示每个索引号对应的值。 确认解析无误后,点击“下一步”按钮。 Kafka

2.1.4 配置

导入目标表会默认使用新建表,数据库默认使用postgres,模式默认使用public,表名默认与Kafka主题一致,这里默认为csv_test。 Kafka

点击目标表选择框,可以选择数据库、模式、表名,也可以新建模式和表名。 Kafka

选择完导入模式并配置自动分区策略之后,点击“下一步”按钮。 Kafka

注意!
使用图形化界面创建的表,默认使用MARS2存储引擎。更多相关信息请见MARS2

2.1.5 映射

映射页面进行源数据字段与目标表字段的映射。首先选择源数据字段,再在映射规则中设置目标字段相关信息,设置好之后点击保存,即可建立一条映射规则。 Kafka

特别的,当源字段值为UNIX时间戳且目标字段类型选择为timestamptz时,输出值右侧会显示时间戳格式转换复选框,默认为选中状态,输出值结果自动转换为数据库的timestamptz格式。 Kafka

映射规则保存完之后,还需要选择“设备标识”以及“时间列”。“设备标识”选择目标表具有唯一约束的字段或者字段组合,“时间列”选择目标表类型为timestamptz的字段。选择完毕后,点击“下一步”按钮。 Kafka

2.1.6 提交

页面展示了数据流所有的配置信息,目标表的唯一标识由“设备标识 + 时间列”组合而成。确认信息后点击“提交”按钮。 Kafka

提交成功后进入到创建成功提示页面,到此Kafka数据流已成功创建。 Kafka

2.2 如果是JSON文件的话

JSON格式的数据流,创建过程与CSV格式数据流有一些小的差异。

  • 首先,”选择“页面中,选择JSON数据格式。

Kafka

  • 其次,”解析“页面可以看出,JSON列的索引使用$符号开始,.加列名来索引,索引规则参照JSONPathKafka

  • 最后,因为JSON支持多层级,所以在“映射”页面里可选择不同层级的源数据字段创建映射规则。 Kafka

3. 查看接入数据

Kafka数据流列表显示在“数据流”菜单栏的首页,点击右侧的“数据流”菜单项即可回到数据流首页。 Kafka

点击“详情”进入到数据流详细信息页。 Kafka

详情页展示数据流的运行状态相关的详细信息。 Kafka

如果想要验证数据流是否正确把数据导入到数据库中,可以通过命令行登录数据库,输入下面的SQL进行验证。

postgres=# select * from csv_test order by id;
 id |   city   | score  |      create_time
----+----------+--------+------------------------
  1 | Beijing  | 123.05 | 2022-04-27 07:13:03+00
  2 | Shanghai | 112.95 | 2022-04-27 07:13:03+00
  3 | Shenzhen |    100 | 2022-04-27 07:13:03+00
  4 | Guangxi  |   88.5 | 2022-04-27 07:13:03+00
(4 rows)

4. Kafka数据流的暂停和恢复

当你使用MatrixDB图形化界面创建Kafka数据流后,数据会持续导入MatrixDB。此时,如因服务调试等原因,你想要临时暂停Kafka数据的导入,也可以直接在图形化界面进行操作。

4.1 暂停Kafka数据流

对于运行中的Kafka数据流,可以从列表页操作入口暂停数据导入。

注意!
暂停操作可能需要1-10秒生效,期间不能对该数据流进行其他操作;暂停操作完成后,Kafka数据将停止导入,同时状态切换至“已暂停”。

Kafka Kafka

4.2 恢复Kafka数据流

对于处于“已暂停”状态的Kafka数据流,可以从列表页操作入口恢复数据导入。确认操作后,数据流会立即恢复导入;页面将在最多5秒后恢复显示最新的导入信息。 Kafka