Kafka 接入

Apache Kafka 是一个开源分布式事件流平台,它可以被当作是一个消息系统,读写流式数据,帮助发布、订阅消息;也可以用于编写可扩展的流处理应用程序,来应付实时响应的场景;还可以与数据库对接,安全的将流式数据存储在一个分布式,有副本备份,可容错的集群中。可以说,它所生产的“事件流”是信息数据传输的“中枢神经”。

如果你正打算用 Kafka 将数据写入 YMatrix 集群,请不要错过这篇文档。YMatrix 数据库支持 Kafka 无缝连接功能,可将 Kafka 数据持续自动导入到 YMatrix 表中,并支持图形化界面操作。 目前接入的数据格式可以是 CSV 和 JSON。我们将以一个最简单的例子,来介绍如何用 YMatrix 管理平台接入 Kafka 数据。

1 准备工作

1.1 搭建 Kafka 环境

首先你需要搭建一个健康的 Kafka 环境,参照官方指南 Kafka Quickstart 即可。

1.2 创建 Kafka 主题

现在,你已经在服务器上搭建好了 Kafka 环境,你可以通过以下命令或其他方式进入。

$ cd packages/kafka_2.13-3.2.0

然后启动 Kafka 服务并创建一个测试主题(Topic),端口号默认为 9092。

$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.3 写入测试数据

然后分条写入几个测试数据,以 ctrl-c 结束:

$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C

通过上面命令向新创建的 csv_test 写入了4条数据,分别为 4 条逗号分割的 CSV 行。 准备工作完毕,下面将介绍如何创建 Kafka 数据流来导入数据。

2 创建 Kafka 数据流

在浏览器里输入 MatrixGate 所在机器的 IP(默认是主节点的 IP)、端口号:

http://<IP>:8240

成功登录之后进入主界面,选择 Kafka 数据源。首先以 CSV 文件为例说明步骤。

2.1 CSV 数据流接入

2.1.1 选择或新建目标表

首先,你可以选择已有表或新建表来作为接收 Kafka 数据的目标表。

2.1.1.1 选择已有表

在目标表栏选择已有表与数据导入模式。这里假设选择的表为 postgres 数据库 public 模式下的 test

2.1.1.2 选择新建表

新建表,这里假设模式为 public,表名为 test1

建表成功,出现红色的新建标识。

为其选择数据导入模式及自动分区策略。此处分区策略默认采用 auto_partitioning

2.1.2 源数据

目标表选择完成后,第二步设置源数据的连接。你需要完成以下操作,才可以正确地进入下一步:

  • 首先,在左侧菜单栏按照图中格式在地址框中输入 Kafka broker 地址及端口。如果有多个 URL,需要用逗号分割。
  • 然后,连接 Kafka 所需验证方式,并验证 Kafka 中的用户名和密码。
  • 最后,在右侧菜单栏勾选或搜索需要连接的 Kafka 主题。

2.1.3 映射

第三步配置样例数据,并映射已有表或新建表以导入 Kafka 数据。

2.1.3.1 基本配置

选择 CSV 源数据格式,并指定或配置样例数据。你需要指定或手动配置样例数据格式,以保证映射的准确。

在配置样例数据时,你可以从主题中选择也可以直接编辑数据

  • 如果 Kafka 主题(Topic)中有完整的数据,则从主题中选择它作为样例数据即可。
  • 当主题中的数据不完整(如存在空值列)时,你可以选择直接编辑数据,按照目标表的字段结构给予我们一条样例数据。该样例数据只用于字段结构的解析与映射,不会被导入到目标表中。

注意!
使用图形化界面新创建的表,默认使用 MARS3 存储引擎。更多相关信息请见 MARS3

2.1.3.2 执行映射

简单地映射配置完成后,进行源数据字段与目标表字段的映射。此步骤已有表与新建表的操作有差异。

  • 映射已有表

    首先选择一条样例数据,再在映射关系中找到对应的目标字段,设置好之后点击保存,即可建立一条映射关系。

    也可以直接一键自动映射,便捷的同时也保持映射的正确性。

  • 映射新建表

    新建表除建立映射规则外,还需注意以下设置: 当源字段值为 UNIX 时间戳且目标字段类型选择为 timestamptz 时,输出值右侧会显示复选框存储为 timestamp,勾选此框,输出值结果会自动转换为数据库的 timestamp 格式。

    映射规则保存完之后,还需要选择“设备标识”以及“时间列”。“设备标识”选择目标表具有唯一约束的字段或者字段组合,“时间列”选择目标表类型为 timestamptz 的字段。

2.1.4 提交

最后,确认数据流所有的配置信息,有误返回修改,无误则提交。

2.1.4.1 已有表

页面展示了数据流所有的配置信息,确认信息后点击“提交”按钮。

2.1.4.2 新建表

比已有表更多地显示了由“设备标识 + 时间列”组合而成的唯一标识信息。确认信息后点击“提交”按钮。

不论是已有表还是新建表,提交成功后都会进入到创建成功提示页面,到此 Kafka 数据流已成功创建。

2.2 JSON 数据流接入

这里主要描述与 CSV 格式接入的差异,集中在映射步骤。

  • 首先,选择 JSON 数据格式并配置样例数据。

  • 其次,JSON 列的索引使用 $["列名"] 的语义来索引,索引规则参照 JSONPath

  • 最后,JSON 数据支持多层级嵌套,YMatrix 支持选择不同层级的源数据字段创建映射规则。点击样例数据栏中 展开嵌套层级。

3 Kafka 数据流的暂停和恢复

当你使用 YMatrix 图形化界面创建 Kafka 数据流后,数据会持续导入 YMatrix。此时,如因服务调试等原因,你想要临时暂停 Kafka 数据的导入,也可以直接在图形化界面进行操作。

3.1 暂停 Kafka 数据流

对于运行中的 Kafka 数据流,可以从列表页操作入口暂停数据导入。

注意!
暂停操作可能需要 1-10 秒生效,期间不能对该数据流进行其他操作;暂停操作完成后,Kafka 数据将停止导入,同时状态切换至“已暂停”。

3.2 恢复Kafka数据流

对于处于“已暂停”状态的 Kafka 数据流,可以从列表页操作入口恢复数据导入。确认操作后,数据流会立即恢复导入;页面将在最多 5 秒后恢复显示最新的导入信息。