高性能数据加载工具MatrixGate
本课程教学视频请参考MatrixDB数据接入
MatrixGate简称mxgate,是MatrixDB自带的高性能数据加载工具。
使用mxgate进行数据加载性能要远远高于原生INSERT语句。因为mxgate可以直接与segment进行通信,不存在master单点瓶颈。
1. INSERT与MatrixGate对比
写入方式 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
直接INSERT | 接口简单 | 吞吐量低 | 吞吐量低,几十万数据点/秒 |
MatrixGate | 吞吐量高 准实时 |
需要额外部署,有运维成本 | 吞吐量高,千万数据点/秒 |
经测试,使用MatrixGate进行数据导入与竞品相比,性能可以提升几十倍。具体数据可参考时序数据库插入性能评测:MatrixDB是InfluxDB的78倍
2. MatrixGate的使用方法
MatrixGate提供如下运行模式:
- 服务模式
- 命令行模式
下面演示如何使用这两种模式向数据表灌入数据,数据表dest的schema如下:
CREATE TABLE dest(
time timestamp,
c1 int,
c2 text
)DISTRIBUTED BY(c1);
2.1 服务模式
服务模式会有后台进程常驻,提供HTTP接口给用户,用来提交时序数据,是生产环境中普通使用的方式。
2.1.1 生成配置文件
使用服务模式首先要生成配置文件,确定数据库连接信息、目标表等参数
mxgate config --db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--target public.dest \
--time-format raw \
--delimiter ',' \
> mxgate.conf
如上命令中确定了如下信息:
参数名 | 描述 | 值 |
---|---|---|
--db-database | 数据库 | test |
--db-master-host | 数据库主机 | localhost |
--db-master-port | 数据库端口 | 5432 |
--db-user | 数据库用户名 | mxadmin |
--target | 目标表 | public.dest |
--time-format | 时间格式 | raw(明文) |
--delimiter | 分隔符 | , |
2.1.2 启动MatrixGate
然后,启动MatrixGate,在启动参数中指定刚才生成的配置文件:
mxgate start --config mxgate.conf
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully
2.1.3 提交数据
启动成功后,下面使用curl工具发送HTTP请求提交数据。
在生产环境中则使用编程语言支持的HTTP库来提交数据
准备了测试数据文件rows_header.csv,内容如下:
[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3
在提交数据的时候,首行要指定目标表名,因为MatrixGate服务可能有多个目标表。
提交数据:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"
MatrixGate默认绑定8086端口,可以通过配置文件修改。
查询灌入的数据:
test=# select * from dest;
time | c1 | c2
---------------------+----+-----
2021-01-01 00:00:00 | 11 | a11
2021-01-01 00:00:00 | 12 | a12
2021-01-01 00:00:00 | 13 | a13
(3 rows)
2.1.4 运维管理
MatrixGate还提供了其他运维命令来进行运维管理。
查看状态
mxgate status
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 alive
Launched At 2021-09-01 14:59:03
Up For 26 seconds
Binary /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config /home/mxadmin/mxgate.conf
可以看到服务程序运行状态以及配置文件和日志路径,用来追查问题。
停止服务
mxgate stop
******************************************************
__ __ _ _ ____ _
| \/ | __ _| |_ _ __(_)_ __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / | _ / _` | __/ _ \
| | | | (_| | |_| | | |> <| |_| | (_| | || __/
|_| |_|\__,_|\__|_| |_/_/\_\\____|\__,_|\__\___|
Version: 4.2.0
Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 stopped
mxgate stop则可以停止服务。
2.2 命令行模式
命令行模式用来一次性灌入数据文件,结束后进程也随之退出。
还是刚才的数据文件,去掉第一行目标表,只保留数据行,执行如下命令:
cat rows.csv | mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2 --delimiter ','
3. MatrixGate的UPSERT语义支持
在接入时序数据的时候我们可能会面临这样的场景:
- 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
- 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入
MatrixGate 4.2版本新增了UPSERT语义,用来解决如上问题。
3.1 使用UPSERT语义加载数据
3.1.1 创建数据表
CREATE TABLE upsert_demo (
ts timestamp
, tagid int
, c1 int
, c2 int
, UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);
注意,为了数据库能够使用UPSERT功能,要在表的 设备id+时间戳 上创建UNIQUE约束。
3.1.2 准备数据文件
upsert_demo1.dat:
2020-11-11|1|10|
upsert_demo2.dat:
2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|
3.1.3 加载数据文件
加载upsert_demo1.dat:
cat upsert_demo1.dat|mxgate --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
查询结果:
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+----
2020-11-11 00:00:00 | 1 | 10 |
(1 row)
加载upsert_demo2.dat:
cat upsert_demo2.dat|mxgate --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
查询结果:
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+-----+-----
2020-11-11 00:00:00 | 1 | 10 | 20
2020-11-11 00:00:00 | 2 | 200 | 100
(2 rows)
从结果可以看到,ts和tagid相同的行数据,进行了合并。
3.2 UPSERT使用注意事项
- UNIQUE约束与--upsert-key必须完全一致,参数传递顺序无要求
- --upsert-key必须定义UNIQUE约束
更详细的使用方法请参考MatrixGate。