MatrixDB 4.2新特性解读之UPSERT

背景

MatrixDB 4.2版本发布后,MatrixGate推出了一个新特性——UPSERT。

本文将详细介绍UPSERT语义及MatrixGate新特性的使用方法。

什么是UPSERT ?:UPSERT 就是UPDATE和INSERT的结合,实现一条SQL内自动插入或者更新:如果记录不存在则插入,如果记录存在则更新。这是应用开发中最常见的场景之一,通过UPSERT语法可以大幅简化开发代码复杂度,提升效率。

众多场景下,免不了使用UPSERT功能,比如:

  • 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
  • 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入 upsert1 UPSERT语义则直接实现了该逻辑。 下面演示一下UPSERT的用法。

UPSERT的使用方法

准备数据表

DROP TABLE IF EXISTS upsert_demo;

CREATE TABLE upsert_demo (
    ts    timestamp
  , tagid int
  , c1    int
  , c2    float4
  , UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);

注意,为了数据库能够使用UPSERT功能,要在表的 设备id+时间戳 上创建UNIQUE约束。

使用UPSERT语义接入数据

SQL方式

先来演示一下,如何通过直接执行SQL语句的方式进行UPSERT,对于使用libpq或JDBC等来连接数据库的用户,可以直接参考如下SQL语法来操作。

假设设备指标c1和c2分两个批次发送,则使用如下SQL进行数据填充。

INSERT INTO upsert_demo VALUES ('2020-11-11', 1, 10, NULL)
    ON CONFLICT (ts, tagid)
    DO UPDATE SET
        c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
        c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);

INSERT INTO upsert_demo VALUES ('2020-11-11', 1, NULL, 20.1)
    ON CONFLICT (ts, tagid)
    DO UPDATE SET
        c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
        c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);

如上两条SQL语句里都包含了唯一索引列:ts和tagid,并且值相同。SQL1只包含了c1列,c2列为空;SQL2只包含了c2列,c1列为空。

ON CONFLICT子句指定了唯一索引包含的列,用来判断数据行是INSERT还是UPDATE。

c1 = coalesce(EXCLUDED.c1, upsert_demo.c1)的含义是取EXCLUDED.c1和upsert_demo.c1中,第一个不为NULL的值。EXCLUDED.c1是原行的值,upsert_demo.c1是新插入的值。即如果原数据行存在,且列值不为空则使用原值;否则使用新插入的值。

通过这种方式,实现了相同设备、相同时间的指标数据合并。

test=# select * from upsert_demo ;
         ts          | tagid | c1 |  c2
---------------------+-------+----+------
 2020-11-11 00:00:00 |     1 | 10 | 20.1
(1 row)

查询upsert_demo可以看到,c1和c2的数据是期望的结果。

通过如上演示可以看到,使用UPSERT语义,可以通过单条SQL实现根据数据是否存在来选择是插入还是更新,大大简化了开发人员的工作。 upsert2

MatrixGate方式

相比使用SQL方式来接入,MatrixGate作为MatrixDB高性能数据接入工具,在4.2版本中也加入了对UPSERT语义的支持,所以在生产环境中,更加推荐用户选择使用MatrixGate方式接入,性能可以提升百倍。

下面演示如何使用MatrixGate的UPSERT语义:

1. 准备数据文件

文件1: /tmp/upsert_demo1.dat

ts|tagid|c1|c2
2020-11-11|1|10|

文件2: /tmp/upsert_demo2.dat

ts|tagid|c1|c2
2020-11-11|1||20.1
2020-11-11|2||100.5
2020-11-11|2|200|
2. 执行Gate

目标数据库假设为 test,端口为 5432

重点在--upsert-key参数

载入第一个文件(为了验证结果,载入前已将表中原数据清空)

tail -n +2 /tmp/upsert_demo1.dat | mxgated --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

结果

test=# select * from upsert_demo ;
         ts          | tagid | c1 | c2
---------------------+-------+----+----
 2020-11-11 00:00:00 |     1 | 10 |
(1 row)

载入第二个文件

tail -n +2 /tmp/upsert_demo2.dat | mxgated --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

结果

test=# select * from upsert_demo;
         ts          | tagid | c1  |  c2
---------------------+-------+-----+-------
 2020-11-11 00:00:00 |     1 |  10 |  20.1
 2020-11-11 00:00:00 |     2 | 200 | 100.5
(2 rows)

从结果可以看到,ts和tagid相同的行数据,进行了合并。

3. Q&A
  • UNIQUE约束与--upsert-key是否可以不完全相同,而是子集或者超集的关系?
    • 不可以,必须完全一致
  • UNIQUE约束里的column顺序是否需要与--upsert-key的声明相同
    • 不需要,顺序无关
  • UNIQUE约束是否可以使用函数或表达式?例如 time_bucket(colA, '1 min')
    • PG12不支持,相应的我们也不支持
  • 指标列是否可以加NOT NULL约束?
    • 如果某列上有NOT NULL,则每批数据都必须包含非空的值,否则会引起错误。因此在使用UPSERT功能时,指标列推荐不加NOT NULL
  • --upsert-key 是否可以是 serial (auto-increment)类型
    • 不能,mxgate会检查并报错
  • --upsert-key 是否可以同时是 --exclude-columns
    • 不可以,mxgate会检查并报错
  • --upsert-key 上如果没有定义 UNIQUE 约束,行为是什么样的?
    • mxgate会检查并报错,不会有数据入库

总结

通过如上演示可以看到,UPSERT语义对于时序数据指标分批发送的场景非常有用,会大大的减少开发人员的负担,而在MatrixDB 4.2 版本中MatrixGate高性能数据接入工具也支持了该特性,欢迎大家下载使用。