高性能数据加载工具MatrixGate

本课程教学视频请参考MatrixDB数据接入

MatrixGate简称mxgate,是MatrixDB自带的高性能数据加载工具。

使用mxgate进行数据加载性能要远远高于原生INSERT语句。因为mxgate可以直接与segment进行通信,不存在master单点瓶颈。

1. INSERT与MatrixGate对比

写入方式 优势 劣势 适用场景
直接INSERT 接口简单 吞吐量低 吞吐量低,几十万数据点/秒
MatrixGate 吞吐量高
准实时
需要额外部署,有运维成本 吞吐量高,千万数据点/秒

经测试,使用MatrixGate进行数据导入与竞品相比,性能可以提升几十倍。具体数据可参考时序数据库插入性能评测:MatrixDB是InfluxDB的78倍

2. MatrixGate的使用方法

MatrixGate提供如下运行模式:

  • 服务模式
  • 命令行模式
  • 迁移模式

下面演示如何使用这两种模式向数据表灌入数据,数据表dest的schema如下:

CREATE TABLE dest(
    time timestamp,
    c1 int,
    c2 text
)DISTRIBUTED BY(c1);

2.1 服务模式

服务模式会有后台进程常驻,提供HTTP接口给用户,用来提交时序数据,是生产环境中普通使用的方式。

2.1.1 生成配置文件

使用服务模式首先要生成配置文件,确定数据库连接信息、目标表等参数

mxgate config --db-database test \
            --db-master-host localhost \
            --db-master-port 5432 \
            --db-user mxadmin \
            --target public.dest \
            --time-format raw \
            --delimiter ',' \
            > mxgate.conf

如上命令中确定了如下信息:

参数名 描述
--db-database 数据库 test
--db-master-host 数据库主机 localhost
--db-master-port 数据库端口 5432
--db-user 数据库用户名 mxadmin
--target 目标表 public.dest
--time-format 时间格式 raw(明文)
--delimiter 分隔符 ,

2.1.2 启动MatrixGate

然后,启动MatrixGate,在启动参数中指定刚才生成的配置文件:

mxgate start --config mxgate.conf
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully

2.1.3 提交数据

启动成功后,下面使用curl工具发送HTTP请求提交数据。

在生产环境中则使用编程语言支持的HTTP库来提交数据

准备了测试数据文件rows_header.csv,内容如下:

[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3

在提交数据的时候,首行要指定目标表名,因为MatrixGate服务可能有多个目标表。

提交数据:

curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"

MatrixGate默认绑定8086端口,可以通过配置文件修改。

查询灌入的数据:

test=# select * from dest;
        time         | c1 | c2
---------------------+----+-----
 2021-01-01 00:00:00 | 11 | a11
 2021-01-01 00:00:00 | 12 | a12
 2021-01-01 00:00:00 | 13 | a13
(3 rows)

更详细的API参数请参考文档

2.1.4 运维管理

MatrixGate还提供了其他运维命令来进行运维管理。

查看状态

mxgate status
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID          15146 alive
Launched At  2021-09-01 14:59:03
Up For       26 seconds
Binary       /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log          /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config       /home/mxadmin/mxgate.conf

可以看到服务程序运行状态以及配置文件和日志路径,用来追查问题。

停止服务

mxgate stop
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 stopped

mxgate stop则可以停止服务。

观测服务

可以使用mxgate watch子命令来实时观测服务

mxgate watch
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.5.0
  Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
******************************************************
watch cmd will run forever until killed, you can use watch -T n to change the duration to n seconds;and you can use mxgate watch --info to get info of columns;
                 Time          WCount          ICount        WSpeed/s        ISpeed/s  WBandWidth MB/S     BlocakItems
  2022-04-28 15:20:58        14478858        14527011         2598081         2627887            2395               0
  2022-04-28 15:21:01        22231035        22633254         2584059         2702081            2222               0
  2022-04-28 15:21:04        30494310        30500874         2754425         2622540            3551               0
  2022-04-28 15:21:07        38004210        38032956         2503300         2510694            2862               0
  2022-04-28 15:21:10        46188696        46298223         2728162         2755089            2227               0
  ...

或者使用mxgate watch --history来观测历史数据

mxgate watch --history
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.5.0
  Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
******************************************************
                TIME RANGE                | SPEED/S  | BANDWIDTH MB/S  | BLOCK ITEMS
  2022-04-28 16:00:00-2022-04-28 17:00:00 |  2208010 |         1254.48 |           0
  2022-04-28 17:00:00-2022-04-28 18:00:00 |  1157920 |         1327.00 |           0
  2022-04-28 18:00:00-2022-04-28 19:00:00 |  2228666 |         2162.32 |           0
  2022-04-28 19:00:00-2022-04-28 20:00:00 |  1371092 |         2881.30 |           0
  2022-04-28 20:00:00-2022-04-28 21:00:00 |  1575320 |         2608.20 |           0


2.2 命令行模式

命令行模式用来一次性灌入数据文件,结束后进程也随之退出。

还是刚才的数据文件,去掉第一行目标表,只保留数据行,执行如下命令:

cat rows.csv | mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2  --delimiter ',' 

有关文件接入的更多方法请参考文档


2.3 迁移模式

迁移模式用来做高速数据迁移,支持将其他Greenplum5、Greenplum6、MatrixDB集群的数据表迁移到当前MatrixDB集群中。

用法如下:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-password abc \
         --src-schema public \
         --src-table trans_ao \
         --compress "gzip" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --db-database ttt \
         --target public.trans_ao \
         --format text \
         --time-format raw \
         --use-auto-increment=false

其中:

参数名 描述
--source 功能入口,必须指定'transfer'
--src-host 源库master的ip地址
--src-port 源库master的端口号
--src-user 连接源库的用户名(建议使用superuser)
--src-password 连接密码
--src-schema 源表的schema名
--src-table 源表的表名
--compress 源数据库segment主机到本数据的传输方法:
空白字符串“”,代表不压缩,明文传输
gzip:使用gzip压缩,需要源数据库的segment主机上必须安装有gzip这个linux命令
lz4:使用lz4压缩,需要源数据库的segment主机上必须安装有lz4这个linux命令
推荐 lz4 > gzip > 不压缩
--port-base 传输中会占用一批端口,端口的范围为9129~
--local-ip 必须用源库可以连接到本机的IP地址
--db-database 迁移目标表所在的database名
--target 迁移目标表名,可以是 \<schema>.\<table>形式,如果不写schema名默认是public
--format text或csv,仅当迁移的数据里有复杂的字符串(包含换行、引号、分隔符)时,必须CSV。其他情况下text/csv均可时,优先用text模式
--time-format transfer模式下必须是raw
--use-auto-increment 当目标表包括serial类型的自增字段时,mxgate内默认会跳过该类型的字段,加入这个option来关闭mxgate跳过的逻辑

迁移模式的另一个用法是将数据快速导出到文件:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-schema public \
         --src-table trans_ao_1 \
         --compress "lz4" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --save-to-dir /tmp/receive/ \
         --db-database ttt \
         --transform nil \
         --writer nil \
         --target trans_ao

使用--save-to-dir参数来指定文件存储路径。

注意,即使导出到文件,也需要给出--db-database和--target参数来指定目标库和表,并且目标库和表必须存在

过滤迁移可以通过--src-sql参数指定SQL来过滤需要同步的数据,在表到表迁移和表到文件迁移中都可以使用:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-sql "select * from demo where c1 = 'xxxx'" \
         --compress "lz4" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --save-to-dir /tmp/receive/ \
         --db-database ttt \
         --transform nil \
         --writer nil \
         --target trans_ao

有关迁移模式的更多用法请参考文档


3. MatrixGate的UPSERT语义支持

在接入时序数据的时候我们可能会面临这样的场景:

  • 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
  • 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入

MatrixGate 4.2版本新增了UPSERT语义,用来解决如上问题。

3.1 使用UPSERT语义加载数据

3.1.1 创建数据表

CREATE TABLE upsert_demo (
    ts    timestamp
  , tagid int
  , c1    int
  , c2    int
  , UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);

注意,为了数据库能够使用UPSERT功能,要在表的 设备id+时间戳 上创建UNIQUE约束。

3.1.2 准备数据文件

upsert_demo1.dat:

2020-11-11|1|10|

upsert_demo2.dat:

2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|

3.1.3 加载数据文件

加载upsert_demo1.dat:

cat upsert_demo1.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

查询结果:

test=# select * from upsert_demo ;
         ts          | tagid | c1 | c2
---------------------+-------+----+----
 2020-11-11 00:00:00 |     1 | 10 |
(1 row)

加载upsert_demo2.dat:

cat upsert_demo2.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

查询结果:

test=# select * from upsert_demo ;
         ts          | tagid | c1  | c2
---------------------+-------+-----+-----
 2020-11-11 00:00:00 |     1 |  10 |  20
 2020-11-11 00:00:00 |     2 | 200 | 100
(2 rows)

从结果可以看到,ts和tagid相同的行数据,进行了合并。

3.2 UPSERT生成服务模式的配置文件

mxgate config \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --stream-prepared 0 \
  --upsert-key ts \
  --upsert-key tagid > mxgate.conf

注意:matrixgate在服务模式下使用UPSERT特性需要将stream-prepared参数设置为0,否则会触发死锁

3.3 UPSERT使用注意事项

  • UNIQUE约束与--upsert-key必须完全一致,参数传递顺序无要求
  • --upsert-key必须定义UNIQUE约束

3.4 UPSERT之去重

UPSERT还有一个功能是去重。和UPSERT不同的是,UPSERT会用新值强制覆盖旧值;去重仅当旧值为空时才用新值填充,否则将新值丢弃。

去重通过--deduplicate-key参数配置使用,该参数与--upsert-key参数互斥,只能二选一。


4. MatrixGate的容错机制

因为MatrixGate内部使用外部表机制,微批量将数据插入到目标表。所以,任何一条数据在入库时都是和其他提交的数据一起分批进入的。如果其中任何一条数据格式有错误,则整批数据都将入库失败。

从4.3开始,MatrixGate增加了容错机制,单条数据格式错误不会影响其他数据入库,会返回错误数据信息并记录错误日志。

注意:容错仅限格式错误,如果是违反约束规则(唯一索引)则还是会批量失败。

4.1 错误信息

与之前有错误数据就直接返回HTTP 500不同,容错后返回的HTTP码仍是200。响应体中会包含错误行信息,例如:

At line: 2
missing data for column "c3"

4.2 阈值控制

当然,也不是无限制的容忍错误,容忍阈值与GUC:gp_initial_bad_row_limit相关。当错误数据行数超过5 * gp_initial_bad_row_limit时,批量写入失败。

更详细的使用方法请参考MatrixGate