高性能数据加载工具MatrixGate

本课程教学视频请参考MatrixDB数据接入

MatrixGate简称mxgate,是MatrixDB自带的高性能数据加载工具。

使用mxgate进行数据加载性能要远远高于原生INSERT语句。因为mxgate可以直接与segment进行通信,不存在master单点瓶颈。

1. INSERT与MatrixGate对比

写入方式 优势 劣势 适用场景
直接INSERT 接口简单 吞吐量低 吞吐量低,几十万数据点/秒
MatrixGate 吞吐量高
准实时
需要额外部署,有运维成本 吞吐量高,千万数据点/秒

经测试,使用MatrixGate进行数据导入与竞品相比,性能可以提升几十倍。具体数据可参考时序数据库插入性能评测:MatrixDB是InfluxDB的78倍

2. MatrixGate的使用方法

MatrixGate提供如下运行模式:

  • 服务模式
  • 命令行模式

下面演示如何使用这两种模式向数据表灌入数据,数据表dest的schema如下:

CREATE TABLE dest(
    time timestamp,
    c1 int,
    c2 text
)DISTRIBUTED BY(c1);

2.1 服务模式

服务模式会有后台进程常驻,提供HTTP接口给用户,用来提交时序数据,是生产环境中普通使用的方式。

2.1.1 生成配置文件

使用服务模式首先要生成配置文件,确定数据库连接信息、目标表等参数

mxgate config --db-database test \
            --db-master-host localhost \
            --db-master-port 5432 \
            --db-user mxadmin \
            --target public.dest \
            --time-format raw \
            --delimiter ',' \
            > mxgate.conf

如上命令中确定了如下信息:

参数名 描述
--db-database 数据库 test
--db-master-host 数据库主机 localhost
--db-master-port 数据库端口 5432
--db-user 数据库用户名 mxadmin
--target 目标表 public.dest
--time-format 时间格式 raw(明文)
--delimiter 分隔符 ,

2.1.2 启动MatrixGate

然后,启动MatrixGate,在启动参数中指定刚才生成的配置文件:

mxgate start --config mxgate.conf
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully

2.1.3 提交数据

启动成功后,下面使用curl工具发送HTTP请求提交数据。

在生产环境中则使用编程语言支持的HTTP库来提交数据

准备了测试数据文件rows_header.csv,内容如下:

[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3

在提交数据的时候,首行要指定目标表名,因为MatrixGate服务可能有多个目标表。

提交数据:

curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"

MatrixGate默认绑定8086端口,可以通过配置文件修改。

查询灌入的数据:

test=# select * from dest;
        time         | c1 | c2
---------------------+----+-----
 2021-01-01 00:00:00 | 11 | a11
 2021-01-01 00:00:00 | 12 | a12
 2021-01-01 00:00:00 | 13 | a13
(3 rows)

2.1.4 运维管理

MatrixGate还提供了其他运维命令来进行运维管理。

查看状态

mxgate status
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID          15146 alive
Launched At  2021-09-01 14:59:03
Up For       26 seconds
Binary       /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log          /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config       /home/mxadmin/mxgate.conf

可以看到服务程序运行状态以及配置文件和日志路径,用来追查问题。

停止服务

mxgate stop
******************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
******************************************************
PID 15146 stopped

mxgate stop则可以停止服务。


2.2 命令行模式

命令行模式用来一次性灌入数据文件,结束后进程也随之退出。

还是刚才的数据文件,去掉第一行目标表,只保留数据行,执行如下命令:

cat rows.csv | mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2  --delimiter ',' 


3. MatrixGate的UPSERT语义支持

在接入时序数据的时候我们可能会面临这样的场景:

  • 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
  • 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入

MatrixGate 4.2版本新增了UPSERT语义,用来解决如上问题。

3.1 使用UPSERT语义加载数据

3.1.1 创建数据表

CREATE TABLE upsert_demo (
    ts    timestamp
  , tagid int
  , c1    int
  , c2    int
  , UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);

注意,为了数据库能够使用UPSERT功能,要在表的 设备id+时间戳 上创建UNIQUE约束。

3.1.2 准备数据文件

upsert_demo1.dat:

2020-11-11|1|10|

upsert_demo2.dat:

2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|

3.1.3 加载数据文件

加载upsert_demo1.dat:

cat upsert_demo1.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

查询结果:

test=# select * from upsert_demo ;
         ts          | tagid | c1 | c2
---------------------+-------+----+----
 2020-11-11 00:00:00 |     1 | 10 |
(1 row)

加载upsert_demo2.dat:

cat upsert_demo2.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

查询结果:

test=# select * from upsert_demo ;
         ts          | tagid | c1  | c2
---------------------+-------+-----+-----
 2020-11-11 00:00:00 |     1 |  10 |  20
 2020-11-11 00:00:00 |     2 | 200 | 100
(2 rows)

从结果可以看到,ts和tagid相同的行数据,进行了合并。

3.2 UPSERT使用注意事项

  • UNIQUE约束与--upsert-key必须完全一致,参数传递顺序无要求
  • --upsert-key必须定义UNIQUE约束

更详细的使用方法请参考MatrixGate