高性能数据加载工具 MatrixGate
本课程教学视频请参考 YMatrix 数据接入
MatrixGate 简称 mxgate,是 YMatrix 自带的高性能数据加载工具。
使用 mxgate 进行数据加载性能要远远高于原生 INSERT 语句。因为 mxgate 可以直接与 Segment 进行通信,不存在 Master 单点瓶颈。
1 INSERT 与 MatrixGate 对比
写入方式 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
直接 INSERT | 接口简单 | 吞吐量低 | 吞吐量低,几十万数据点/秒 |
MatrixGate | 吞吐量高 准实时 |
需要额外部署,有运维成本 | 吞吐量高,千万数据点/秒 |
经测试,使用 MatrixGate 进行数据导入与竞品相比,性能可以提升几十倍。具体数据可参考时序数据库插入性能评测:YMatrix 是 InfluxDB 的 78 倍
2 MatrixGate 的使用方法
MatrixGate 提供如下运行模式:
- 服务模式
- 命令行模式
- 迁移模式
下面演示如何使用这两种模式向数据表灌入数据,数据表 dest 的 Schema 如下:
CREATE TABLE dest(
time timestamp,
c1 int,
c2 text
)DISTRIBUTED BY(c1);
2.1 服务模式
服务模式会有后台进程常驻,提供 HTTP 接口给用户,用来提交时序数据,是生产环境中普通使用的方式。
2.1.1 生成配置文件
使用服务模式首先要生成配置文件,确定数据库连接信息、目标表等参数。
mxgate config --db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--target public.dest \
--time-format raw \
--delimiter ',' \
> mxgate.conf
如上命令中确定了如下信息:
参数名 | 描述 | 值 |
---|---|---|
--db-database | 数据库 | test |
--db-master-host | 数据库主机 | localhost |
--db-master-port | 数据库端口 | 5432 |
--db-user | 数据库用户名 | mxadmin |
--target | 目标表 | public.dest |
--time-format | 时间格式 | raw(明文) |
--delimiter | 分隔符 | , |
2.1.2 启动 MatrixGate
然后,启动 MatrixGate,在启动参数中指定刚才生成的配置文件:
mxgate start --config mxgate.conf
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully
2.1.3 提交数据
启动成功后,下面使用 curl 工具发送 HTTP 请求提交数据。
注意!
在生产环境中则使用编程语言支持的 HTTP 库来提交数据
准备了测试数据文件 rows_header.csv,内容如下:
[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3
在提交数据的时候,首行要指定目标表名,因为 MatrixGate 服务可能有多个目标表。
提交数据:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"
MatrixGate 默认绑定 8086 端口,可以通过配置文件修改。
查询灌入的数据:
test=# select * from dest;
time | c1 | c2
---------------------+----+-----
2021-01-01 00:00:00 | 11 | a11
2021-01-01 00:00:00 | 12 | a12
2021-01-01 00:00:00 | 13 | a13
(3 rows)
更详细的 API 参数请参考文档
2.1.4 运维管理
MatrixGate 还提供了其他运维命令来进行运维管理。
查看状态
mxgate status
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 alive
Launched At 2021-09-01 14:59:03
Up For 26 seconds
Binary /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config /home/mxadmin/mxgate.conf
可以看到服务程序运行状态以及配置文件和日志路径,用来追查问题。
停止服务
mxgate stop
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 stopped
mxgate stop 则可以停止服务。
观测服务
可以使用 mxgate watch
子命令来实时观测服务
mxgate watch
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.5.0
Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
******************************************************
watch cmd will run forever until killed, you can use watch -T n to change the duration to n seconds;and you can use mxgate watch --info to get info of columns;
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0
2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0
2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0
2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0
2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0
...
或者使用 mxgate watch --history
来观测历史数据
mxgate watch --history
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.5.0
Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
******************************************************
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0
2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0
2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0
2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0
2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
2.2 命令行模式
命令行模式用来一次性灌入数据文件,结束后进程也随之退出。
还是刚才的数据文件,去掉第一行目标表,只保留数据行,执行如下命令:
cat rows.csv | mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2 --delimiter ','
有关文件接入的更多方法请参考文档
2.3 迁移模式
迁移模式用来做高速数据迁移,支持将其他 Greenplum 5、Greenplum 6、YMatrix 集群的数据表迁移到当前 YMatrix 集群中。
用法如下:
mxgate --source transfer \
--src-host 172.31.41.7 \
--src-port 5432 \
--src-db postgres \
--src-user ec2-user \
--src-password abc \
--src-schema public \
--src-table trans_ao \
--compress "gzip" \
--port-base 9129 \
--local-ip 172.31.33.128 \
--db-database ttt \
--target public.trans_ao \
--format text \
--time-format raw \
--use-auto-increment=false
其中:
参数名 | 描述 |
---|---|
--source | 功能入口,必须指定'transfer' |
--src-host | 源库主节点(Master)的 IP 地址 |
--src-port | 源库 Master 的端口号 |
--src-user | 连接源库的用户名(建议使用 superuser) |
--src-password | 连接密码 |
--src-schema | 源表的模式(Schema)名 |
--src-table | 源表的表名 |
--compress | 源数据库数据节点(Segment)主机到本数据的传输方法: 空白字符串“”,代表不压缩,明文传输 gzip:使用 gzip 压缩,需要源数据库的 Segment 主机上必须安装有 gzip 这个 Linux命令 lz4:使用 lz4 压缩,需要源数据库的 Segment 主机上必须安装有 lz4 这个 Linux 命令 推荐 lz4 > gzip > 不压缩 |
--port-base | 传输中会占用一批端口,端口的范围为 9129 |
--local-ip | 必须用源库可以连接到本机的 IP 地址 |
--db-database | 迁移目标表所在的数据库(Database)名 |
--target | 迁移目标表名,可以是 \<schema>.\<table>形式,如果不写 Schema 名默认是 public |
--format | text 或 csv,仅当迁移的数据里有复杂的字符串(包含换行、引号、分隔符)时,必须 CSV。其他情况下 text/csv 均可时,优先用 text 模式 |
--time-format | transfer 模式下必须是 raw |
--use-auto-increment | 当目标表包括 serial 类型的自增字段时,mxgate 内默认会跳过该类型的字段,加入这个选项来关闭 mxgate 跳过的逻辑 |
迁移模式的另一个用法是将数据快速导出到文件:
mxgate --source transfer \
--src-host 172.31.41.7 \
--src-port 5432 \
--src-db postgres \
--src-user ec2-user \
--src-schema public \
--src-table trans_ao_1 \
--compress "lz4" \
--port-base 9129 \
--local-ip 172.31.33.128 \
--save-to-dir /tmp/receive/ \
--db-database ttt \
--transform nil \
--writer nil \
--target trans_ao
使用 --save-to-dir
参数来指定文件存储路径。
注意!
即使导出到文件,也需要给出 --db-database 和 --target 参数来指定目标库和表,并且目标库和表必须存在。
过滤迁移可以通过 --src-sql
参数指定 SQL 来过滤需要同步的数据,在表到表迁移和表到文件迁移中都可以使用:
mxgate --source transfer \
--src-host 172.31.41.7 \
--src-port 5432 \
--src-db postgres \
--src-user ec2-user \
--src-sql "select * from demo where c1 = 'xxxx'" \
--compress "lz4" \
--port-base 9129 \
--local-ip 172.31.33.128 \
--save-to-dir /tmp/receive/ \
--db-database ttt \
--transform nil \
--writer nil \
--target trans_ao
有关迁移模式的更多用法请参考文档
3 MatrixGate 的 UPSERT 语义支持
在接入时序数据的时候我们可能会面临这样的场景:
- 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
- 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入
MatrixGate 4.2 版本新增了 UPSERT 语义,用来解决如上问题。
3.1 使用 UPSERT 语义加载数据
3.1.1 创建数据表
CREATE TABLE upsert_demo (
ts timestamp
, tagid int
, c1 int
, c2 int
, UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);
注意!
为了数据库能够使用 UPSERT 功能,要在表的 设备 id + 时间戳 上创建 UNIQUE 约束。
3.1.2 准备数据文件
upsert_demo1.dat:
2020-11-11|1|10|
upsert_demo2.dat:
2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|
3.1.3 加载数据文件
加载 upsert_demo1.dat:
cat upsert_demo1.dat|mxgate --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
查询结果:
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+----
2020-11-11 00:00:00 | 1 | 10 |
(1 row)
加载 upsert_demo2.dat:
cat upsert_demo2.dat|mxgate --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
查询结果:
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+-----+-----
2020-11-11 00:00:00 | 1 | 10 | 20
2020-11-11 00:00:00 | 2 | 200 | 100
(2 rows)
从结果可以看到,ts 和 tagid 相同的行数据,进行了合并。
3.2 UPSERT 生成服务模式的配置文件
mxgate config \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--stream-prepared 0 \
--upsert-key ts \
--upsert-key tagid > mxgate.conf
注意!
MatrixGate 在服务模式下使用 UPSERT 特性需要将 stream-prepared 参数设置为 0,否则会触发死锁。
3.3 UPSERT 使用注意事项
- UNIQUE 约束与 --upsert-key 必须完全一致,参数传递顺序无要求
- --upsert-key 必须定义 UNIQUE 约束
3.4 UPSERT 去重
UPSERT 还有一个功能是去重。和 INSERT 不同的是,UPSERT 会用新值强制覆盖旧值;去重仅当旧值为空时才用新值填充,否则将新值丢弃。
去重通过 --deduplicate-key
参数配置使用,该参数与 --upsert-key
参数互斥,只能二选一。
4 MatrixGate 的容错机制
因为 MatrixGate 内部使用外部表机制,微批量将数据插入到目标表。所以,任何一条数据在入库时都是和其他提交的数据一起分批进入的。如果其中任何一条数据格式有错误,则整批数据都将入库失败。
从 4.3 开始,MatrixGate 增加了容错机制,单条数据格式错误不会影响其他数据入库,会返回错误数据信息并记录错误日志。
注意!
容错仅限格式错误,如果是违反约束规则(唯一索引)则还是会批量失败。
4.1 错误信息
与之前有错误数据就直接返回 HTTP 500 不同,容错后返回的 HTTP 码仍是 200。响应体中会包含错误行信息,例如:
At line: 2
missing data for column "c3"
4.2 阈值控制
当然,也不是无限制的容忍错误,容忍阈值与 GUC:gp_initial_bad_row_limit
相关。当错误数据行数超过 5 * gp_initial_bad_row_limit
时,批量写入失败。
更详细的使用方法请参考 MatrixGate。