高性能数据加载工具MatrixGate
本课程教学视频请参考MatrixDB数据接入
MatrixGate简称mxgate,是MatrixDB自带的高性能数据加载工具。
使用mxgate进行数据加载性能要远远高于原生INSERT语句。因为mxgate可以直接与segment进行通信,不存在master单点瓶颈。
1. INSERT与MatrixGate对比
写入方式 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
直接INSERT | 接口简单 | 吞吐量低 | 吞吐量低,几十万数据点/秒 |
MatrixGate | 吞吐量高 准实时 |
需要额外部署,有运维成本 | 吞吐量高,千万数据点/秒 |
经测试,使用MatrixGate进行数据导入与竞品相比,性能可以提升几十倍。具体数据可参考时序数据库插入性能评测:MatrixDB是InfluxDB的78倍
2. MatrixGate的使用方法
MatrixGate提供如下运行模式:
- 服务模式
- 命令行模式
- 迁移模式
下面演示如何使用这两种模式向数据表灌入数据,数据表dest的schema如下:
CREATE TABLE dest(
time timestamp,
c1 int,
c2 text
)DISTRIBUTED BY(c1);
2.1 服务模式
服务模式会有后台进程常驻,提供HTTP接口给用户,用来提交时序数据,是生产环境中普通使用的方式。
2.1.1 生成配置文件
使用服务模式首先要生成配置文件,确定数据库连接信息、目标表等参数
mxgate config --db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--target public.dest \
--time-format raw \
--delimiter ',' \
> mxgate.conf
如上命令中确定了如下信息:
参数名 | 描述 | 值 |
---|---|---|
--db-database | 数据库 | test |
--db-master-host | 数据库主机 | localhost |
--db-master-port | 数据库端口 | 5432 |
--db-user | 数据库用户名 | mxadmin |
--target | 目标表 | public.dest |
--time-format | 时间格式 | raw(明文) |
--delimiter | 分隔符 | , |
2.1.2 启动MatrixGate
然后,启动MatrixGate,在启动参数中指定刚才生成的配置文件:
mxgate start --config mxgate.conf
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully
2.1.3 提交数据
启动成功后,下面使用curl工具发送HTTP请求提交数据。
在生产环境中则使用编程语言支持的HTTP库来提交数据
准备了测试数据文件rows_header.csv,内容如下:
[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3
在提交数据的时候,首行要指定目标表名,因为MatrixGate服务可能有多个目标表。
提交数据:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"
MatrixGate默认绑定8086端口,可以通过配置文件修改。
查询灌入的数据:
test=# select * from dest;
time | c1 | c2
---------------------+----+-----
2021-01-01 00:00:00 | 11 | a11
2021-01-01 00:00:00 | 12 | a12
2021-01-01 00:00:00 | 13 | a13
(3 rows)
更详细的API参数请参考文档
2.1.4 运维管理
MatrixGate还提供了其他运维命令来进行运维管理。
查看状态
mxgate status
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 alive
Launched At 2021-09-01 14:59:03
Up For 26 seconds
Binary /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config /home/mxadmin/mxgate.conf
可以看到服务程序运行状态以及配置文件和日志路径,用来追查问题。
停止服务
mxgate stop
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 stopped
mxgate stop则可以停止服务。
观测服务
可以使用mxgate watch
子命令来实时观测服务
mxgate watch
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.5.0
Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
******************************************************
watch cmd will run forever until killed, you can use watch -T n to change the duration to n seconds;and you can use mxgate watch --info to get info of columns;
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0
2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0
2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0
2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0
2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0
...
或者使用mxgate watch --history
来观测历史数据
mxgate watch --history
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.5.0
Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
******************************************************
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0
2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0
2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0
2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0
2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
2.2 命令行模式
命令行模式用来一次性灌入数据文件,结束后进程也随之退出。
还是刚才的数据文件,去掉第一行目标表,只保留数据行,执行如下命令:
cat rows.csv | mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2 --delimiter ','
有关文件接入的更多方法请参考文档
2.3 迁移模式
迁移模式用来做高速数据迁移,支持将其他Greenplum5、Greenplum6、MatrixDB集群的数据表迁移到当前MatrixDB集群中。
用法如下:
mxgate --source transfer \
--src-host 172.31.41.7 \
--src-port 5432 \
--src-db postgres \
--src-user ec2-user \
--src-password abc \
--src-schema public \
--src-table trans_ao \
--compress "gzip" \
--port-base 9129 \
--local-ip 172.31.33.128 \
--db-database ttt \
--target public.trans_ao \
--format text \
--time-format raw \
--use-auto-increment=false
其中:
参数名 | 描述 |
---|---|
--source | 功能入口,必须指定'transfer' |
--src-host | 源库master的ip地址 |
--src-port | 源库master的端口号 |
--src-user | 连接源库的用户名(建议使用superuser) |
--src-password | 连接密码 |
--src-schema | 源表的schema名 |
--src-table | 源表的表名 |
--compress | 源数据库segment主机到本数据的传输方法: 空白字符串“”,代表不压缩,明文传输 gzip:使用gzip压缩,需要源数据库的segment主机上必须安装有gzip这个linux命令 lz4:使用lz4压缩,需要源数据库的segment主机上必须安装有lz4这个linux命令 推荐 lz4 > gzip > 不压缩 |
--port-base | 传输中会占用一批端口,端口的范围为9129~ |
--local-ip | 必须用源库可以连接到本机的IP地址 |
--db-database | 迁移目标表所在的database名 |
--target | 迁移目标表名,可以是 \<schema>.\<table>形式,如果不写schema名默认是public |
--format | text或csv,仅当迁移的数据里有复杂的字符串(包含换行、引号、分隔符)时,必须CSV。其他情况下text/csv均可时,优先用text模式 |
--time-format | transfer模式下必须是raw |
--use-auto-increment | 当目标表包括serial类型的自增字段时,mxgate内默认会跳过该类型的字段,加入这个option来关闭mxgate跳过的逻辑 |
迁移模式的另一个用法是将数据快速导出到文件:
mxgate --source transfer \
--src-host 172.31.41.7 \
--src-port 5432 \
--src-db postgres \
--src-user ec2-user \
--src-schema public \
--src-table trans_ao_1 \
--compress "lz4" \
--port-base 9129 \
--local-ip 172.31.33.128 \
--save-to-dir /tmp/receive/ \
--db-database ttt \
--transform nil \
--writer nil \
--target trans_ao
使用--save-to-dir
参数来指定文件存储路径。
注意,即使导出到文件,也需要给出--db-database和--target参数来指定目标库和表,并且目标库和表必须存在
过滤迁移可以通过--src-sql
参数指定SQL来过滤需要同步的数据,在表到表迁移和表到文件迁移中都可以使用:
mxgate --source transfer \
--src-host 172.31.41.7 \
--src-port 5432 \
--src-db postgres \
--src-user ec2-user \
--src-sql "select * from demo where c1 = 'xxxx'" \
--compress "lz4" \
--port-base 9129 \
--local-ip 172.31.33.128 \
--save-to-dir /tmp/receive/ \
--db-database ttt \
--transform nil \
--writer nil \
--target trans_ao
有关迁移模式的更多用法请参考文档
3. MatrixGate的UPSERT语义支持
在接入时序数据的时候我们可能会面临这样的场景:
- 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
- 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入
MatrixGate 4.2版本新增了UPSERT语义,用来解决如上问题。
3.1 使用UPSERT语义加载数据
3.1.1 创建数据表
CREATE TABLE upsert_demo (
ts timestamp
, tagid int
, c1 int
, c2 int
, UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);
注意,为了数据库能够使用UPSERT功能,要在表的 设备id+时间戳 上创建UNIQUE约束。
3.1.2 准备数据文件
upsert_demo1.dat:
2020-11-11|1|10|
upsert_demo2.dat:
2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|
3.1.3 加载数据文件
加载upsert_demo1.dat:
cat upsert_demo1.dat|mxgate --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
查询结果:
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+----
2020-11-11 00:00:00 | 1 | 10 |
(1 row)
加载upsert_demo2.dat:
cat upsert_demo2.dat|mxgate --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
查询结果:
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+-----+-----
2020-11-11 00:00:00 | 1 | 10 | 20
2020-11-11 00:00:00 | 2 | 200 | 100
(2 rows)
从结果可以看到,ts和tagid相同的行数据,进行了合并。
3.2 UPSERT生成服务模式的配置文件
mxgate config \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--stream-prepared 0 \
--upsert-key ts \
--upsert-key tagid > mxgate.conf
注意:matrixgate在服务模式下使用UPSERT特性需要将stream-prepared参数设置为0,否则会触发死锁
3.3 UPSERT使用注意事项
- UNIQUE约束与--upsert-key必须完全一致,参数传递顺序无要求
- --upsert-key必须定义UNIQUE约束
3.4 UPSERT之去重
UPSERT还有一个功能是去重。和UPSERT不同的是,UPSERT会用新值强制覆盖旧值;去重仅当旧值为空时才用新值填充,否则将新值丢弃。
去重通过--deduplicate-key
参数配置使用,该参数与--upsert-key
参数互斥,只能二选一。
4. MatrixGate的容错机制
因为MatrixGate内部使用外部表机制,微批量将数据插入到目标表。所以,任何一条数据在入库时都是和其他提交的数据一起分批进入的。如果其中任何一条数据格式有错误,则整批数据都将入库失败。
从4.3开始,MatrixGate增加了容错机制,单条数据格式错误不会影响其他数据入库,会返回错误数据信息并记录错误日志。
注意:容错仅限格式错误,如果是违反约束规则(唯一索引)则还是会批量失败。
4.1 错误信息
与之前有错误数据就直接返回HTTP 500不同,容错后返回的HTTP码仍是200。响应体中会包含错误行信息,例如:
At line: 2
missing data for column "c3"
4.2 阈值控制
当然,也不是无限制的容忍错误,容忍阈值与GUC:gp_initial_bad_row_limit
相关。当错误数据行数超过5 * gp_initial_bad_row_limit
时,批量写入失败。
更详细的使用方法请参考MatrixGate。