Kafka接入

MatrixDB支持kafka无缝连接功能,可将kafka数据持续自动导入到MatrixDB表中,并支持图形化操作。

接入的数据格式包括CSV和JSON。本节将以一个最简单的例子,来介绍如何用MatrixDB管理平台来接入Kafka数据。

1. 准备工作

1.1 创建Kafka Topic

假设我们在本地9092端口启动了Kafka服务,通过下面命令创建一个测试Topic:

bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.2 写入测试数据

然后写入几个测试数据:

bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,1,1.0,abc
>2,2,2.0,bcd
>1,1,1.0,abc
>2,2,2.0,bcd
>^C

通过上面命令向新创建的csv_test写入了4条数据,分别为4条逗号分割的CSV行。

1.3 创建接入目标表

连接数据库test:

[mxadmin@mdw ~]$ psql test
psql (12)
Type "help" for help.

test=#

创建测试表:

CREATE TABLE dest (
  c1 INT,
  c2 FLOAT8,
  c3 TEXT
) DISTRIBUTED BY(c1);

2. 创建接入过程

准备工作完毕,下面演示如何使用图形化界面做接入。

在浏览器里输入MatrixGate所在机器的ip(默认是mdw的ip)、端口和datastream后缀:

http://<IP>:8240/datastream

进入到如下页面,然后点击“下一步”: Kafka

从页面上边的导航栏可以看到整个过程分6步:连接、选择、解析、配置、映射、提交。

2.1 连接

当前进行的是第一步:连接。在数据源里输入Kafka服务器地址,然后点击“下一步”。 Kafka

2.2 选择

选择界面选择Kafka的Topic,这里选择刚才创建的csv_test Topic。同时右边选择Topic对应的格式,CSV格式还需要指定分隔符。 Kafka

选择好Topic后,会看到Topic里的数据。默认会选前10条作为样例。因为我们只向Topic里写入4条,所以这里可以看到全部4条数据。选择其中一条来做后面的解析和映射。 Kafka

2.3 解析

解析页面会对刚才选择的样例数据进行解析,在右边会看到通过分隔符分割后的索引号和对应的值。确认无误后点击“下一步”: Kafka

2.4 配置

配置页面中,选择该Topic的数据接入到哪个目标表,自行选择数据库、模式和表后,会展示出表结构。 Kafka

2.5 映射

映射页面进行源数据与目标表的映射,即CSV列与数据库表列的映射。操作过程为从源数据中选择一列,然后在目标表中选择映射的列。这样,在右边的映射规则中会出现一条新的映射规则,确认后点击保存。 Kafka

按照这个步骤将所有列完成映射即可。 Kafka

这里需要注意两点:

一、列映射类型必须一致,否则会提示如下错误: Kafka

二、如果CSV列数与数据库表列数不一致,或者无法一一对应,则允许出现未被映射的源数据列和数据库表列。未被映射的源数据列会被丢弃;未被映射的目标表列则按照建表规则填充NULL或默认值。 Kafka

2.6 提交

完成映射后,进入到最后一个页面“提交”。该页面汇总了刚才选择的所有结果。确认无误后点“提交”。 Kafka

最后进入到完成页面: Kafka

JSON数据格式

目前Kafka数据接入支持两种数据格式:CSV和JSON。刚才演示了CSV格式的接入,下面对比一下JSON格式在接入时的差别。

选择页面中,选择格式里选择JSON。 Kafka

解析页面可以看出,JSON列的索引使用$符号开始,.加列名来索引。 Kafka

因为JSON支持多层级,所以映射页面里可选择不同层级下的列: Kafka

3. 查看接入数据

按照上面过程创建好了接入过程后,就完成了数据接入过程,之前写入到Topic的数据和后面新接入的数据都会流向目标表中。退回到主界面会看到刚才创建的数据流信息: Kafka

点击详情则会进入到详细信息页面: Kafka

下面登录数据库验证数据已正确接入:

test=# select * from dest;
 c1 | c2 | c3
----+----+-----
  1 |  1 | abc
  2 |  2 | bcd
  1 |  1 | abc
  2 |  2 | bcd
(4 rows)