YMatrix 文档
关于 YMatrix
标准集群部署
数据写入
SQL 参考
- ABORT
- ALTER_DATABASE
- ALTER_EXTENSION
- ALTER_EXTERNAL_TABLE
- ALTER_FOREIGN_DATA_WRAPPER
- ALTER_FOREIGN_TABLE
- ALTER_FUNCTION
- ALTER_INDEX
- ALTER_RESOURCE_GROUP
- ALTER_RESOURCE_QUEUE
- ALTER_ROLE
- ALTER_RULE
- ALTER_SCHEMA
- ALTER_SEQUENCE
- ALTER_SERVER
- ALTER_TABLE
- ALTER_TABLESPACE
- ALTER_TYPE
- ALTER_USER_MAPPING
- ALTER_VIEW
- ANALYZE
- BEGIN
- CHECKPOINT
- COMMIT
- COPY
- CREATE_DATABASE
- CREATE_EXTENSION
- CREATE_EXTERNAL_TABLE
- CREATE_FOREIGN_DATA_WRAPPER
- CREATE_FOREIGN_TABLE
- CREATE_FUNCTION
- CREATE_INDEX
- CREATE_RESOURCE_GROUP
- CREATE_RESOURCE_QUEUE
- CREATE_ROLE
- CREATE_RULE
- CREATE_SCHEMA
- CREATE_SEGMENT_SET
- CREATE_SEQUENCE
- CREATE_SERVER
- CREATE_TABLE
- CREATE_TABLE_AS
- CREATE_TABLESPACE
- CREATE_TYPE
- CREATE_USER_MAPPING
- CREATE_VIEW
- DELETE
- DROP_DATABASE
- DROP_EXTENSION
- DROP_EXTERNAL_TABLE
- DROP_FOREIGN_DATA_WRAPPER
- DROP_FOREIGN_TABLE
- DROP_FUNCTION
- DROP_INDEX
- DROP_RESOURCE_GROUP
- DROP_RESOURCE_QUEUE
- DROP_ROLE
- DROP_RULE
- DROP_SCHEMA
- DROP_SEGMENT_SET
- DROP_SEQUENCE
- DROP_SERVER
- DROP_TABLE
- DROP_TABLESPACE
- DROP_TYPE
- DROP_USER_MAPPING
- DROP_VIEW
- END
- EXPLAIN
- GRANT
- INSERT
- LOAD
- LOCK
- REINDEX
- RELEASE_SAVEPOINT
- RESET
- REVOKE
- ROLLBACK_TO_SAVEPOINT
- ROLLBACK
- SAVEPOINT
- SELECT INTO
- SET ROLE
- SET TRANSACTION
- SET
- SHOW
- START TRANSACTION
- TRUNCATE
- UPDATE
- VACUUM
Kafka 接入
Apache Kafka 是一个开源分布式事件流平台,它可以被当作是一个消息系统,读写流式数据,帮助发布、订阅消息;也可以用于编写可扩展的流处理应用程序,来应付实时响应的场景;还可以与数据库对接,安全的将流式数据存储在一个分布式,有副本备份,可容错的集群中。可以说,它所生产的“事件流”是信息数据传输的“中枢神经”。
如果你正打算用 Kafka 将数据写入 YMatrix 集群,请不要错过这篇文档。YMatrix 数据库支持 Kafka 无缝连接功能,可将 Kafka 数据持续自动导入到 YMatrix 表中,并支持图形化界面操作。 目前接入的数据格式可以是 CSV 和 JSON。我们将以一个最简单的例子,来介绍如何用 YMatrix 管理平台接入 Kafka 数据。
1. 准备工作
1.1 搭建 Kafka 环境
首先你需要搭建一个健康的 Kafka 环境,参照官方指南 Kafka Quickstart 即可。
1.2 创建 Kafka 主题
现在,你已经在服务器上搭建好了 Kafka 环境,你可以通过以下命令或其他方式进入。
$ cd packages/kafka_2.13-3.2.0
然后启动 Kafka 服务并创建一个测试主题(Topic),端口号默认为 9092。
$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092
1.3 写入测试数据
然后分条写入几个测试数据,以 ctrl-c 结束:
$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C
通过上面命令向新创建的 csv_test 写入了4条数据,分别为 4 条逗号分割的 CSV 行。 准备工作完毕,下面将介绍如何创建 Kafka 数据流来导入数据。
2. 创建 Kafka 数据流
在浏览器里输入 MatrixGate 所在机器的 IP(默认是主节点的 IP)、端口号:
http://<IP>:8240
成功登录之后进入主界面,选择 Kafka 数据源。首先以 CSV 文件为例说明步骤。
2.1 CSV数据流接入
2.1.1 连接
在数据源里输入 Kafka broker 地址,显示连接成功之后,点击“下一步”按钮。如果有多个 URL,需要用逗号分割。
2.1.2 选择
主题选择刚才创建的 csv_test,选择完之后会看到主题中的数据列表。
选中一条数据作为解析与映射的样例数据,再选择 CSV 数据格式,此时会自动根据该样例数据解析出数据使用的分隔符,点击“下一步”按钮。
2.1.3 解析
根据上一步选中的样例数据以及数据格式进行解析。字段列显示CSV数据通过分隔符分隔之后每个字段的索引号,值列显示每个索引号对应的值。 确认解析无误后,点击“下一步”按钮。
2.1.4 配置
导入目标表会默认使用新建表,数据库默认使用 postgres,模式默认使用 public,表名默认与 Kafka 主题一致,这里默认为 csv_test。
点击目标表选择框,可以选择数据库、模式、表名,也可以新建模式和表名。
选择完导入模式并配置自动分区策略之后,点击“下一步”按钮。
注意!
使用图形化界面创建的表,默认使用 MARS2 存储引擎。更多相关信息请见 MARS2
2.1.5 映射
映射页面进行源数据字段与目标表字段的映射。首先选择源数据字段,再在映射规则中设置目标字段相关信息,设置好之后点击保存,即可建立一条映射规则。
特别的,当源字段值为 UNIX 时间戳且目标字段类型选择为 timestamptz 时,输出值右侧会显示时间戳格式转换复选框,默认为选中状态,输出值结果自动转换为数据库的 timestamptz 格式。
映射规则保存完之后,还需要选择“设备标识”以及“时间列”。“设备标识”选择目标表具有唯一约束的字段或者字段组合,“时间列”选择目标表类型为 timestamptz 的字段。选择完毕后,点击“下一步”按钮。
2.1.6 提交
页面展示了数据流所有的配置信息,目标表的唯一标识由“设备标识 + 时间列”组合而成。确认信息后点击“提交”按钮。
提交成功后进入到创建成功提示页面,到此 Kafka 数据流已成功创建。
2.2 JSON数据流接入
JSON 格式的数据流,创建过程与 CSV 格式数据流有一些小的差异。
- 首先,”选择“页面中,选择 JSON 数据格式。
-
其次,”解析“页面可以看出,JSON 列的索引使用
$
符号开始,.
加列名来索引,索引规则参照 JSONPath。 -
最后,因为 JSON 支持多层级,所以在“映射”页面里可选择不同层级的源数据字段创建映射规则。
3. 查看接入数据
Kafka 数据流列表显示在“数据流”菜单栏的首页,点击右侧的“数据流”菜单项即可回到数据流首页。
点击“详情”进入到数据流详细信息页。
详情页展示数据流的运行状态相关的详细信息。
如果想要验证数据流是否正确把数据导入到数据库中,可以通过命令行登录数据库,输入下面的 SQL 进行验证。
postgres=# SELECT * FROM csv_test ORDER BY id;
id | city | score | create_time
----+----------+--------+------------------------
1 | Beijing | 123.05 | 2022-04-27 07:13:03+00
2 | Shanghai | 112.95 | 2022-04-27 07:13:03+00
3 | Shenzhen | 100 | 2022-04-27 07:13:03+00
4 | Guangxi | 88.5 | 2022-04-27 07:13:03+00
(4 rows)
4. Kafka 数据流的暂停和恢复
当你使用 YMatrix 图形化界面创建 Kafka 数据流后,数据会持续导入 YMatrix。此时,如因服务调试等原因,你想要临时暂停 Kafka 数据的导入,也可以直接在图形化界面进行操作。
4.1 暂停 Kafka 数据流
对于运行中的 Kafka 数据流,可以从列表页操作入口暂停数据导入。
注意!
暂停操作可能需要 1-10 秒生效,期间不能对该数据流进行其他操作;暂停操作完成后,Kafka 数据将停止导入,同时状态切换至“已暂停”。
4.2 恢复Kafka数据流
对于处于“已暂停”状态的 Kafka 数据流,可以从列表页操作入口恢复数据导入。确认操作后,数据流会立即恢复导入;页面将在最多 5 秒后恢复显示最新的导入信息。