MatrixDB 4.2新特性解读之UPSERT
背景
MatrixDB 4.2版本发布后,MatrixGate推出了一个新特性——UPSERT。
本文将详细介绍UPSERT语义及MatrixGate新特性的使用方法。
什么是UPSERT ?:UPSERT 就是UPDATE和INSERT的结合,实现一条SQL内自动插入或者更新:如果记录不存在则插入,如果记录存在则更新。这是应用开发中最常见的场景之一,通过UPSERT语法可以大幅简化开发代码复杂度,提升效率。
众多场景下,免不了使用UPSERT功能,比如:
- 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
- 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入
UPSERT语义则直接实现了该逻辑。 下面演示一下UPSERT的用法。
UPSERT的使用方法
准备数据表
DROP TABLE IF EXISTS upsert_demo;
CREATE TABLE upsert_demo (
ts timestamp
, tagid int
, c1 int
, c2 float4
, UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);
注意,为了数据库能够使用UPSERT功能,要在表的 设备id+时间戳 上创建UNIQUE约束。
使用UPSERT语义接入数据
SQL方式
先来演示一下,如何通过直接执行SQL语句的方式进行UPSERT,对于使用libpq或JDBC等来连接数据库的用户,可以直接参考如下SQL语法来操作。
假设设备指标c1和c2分两个批次发送,则使用如下SQL进行数据填充。
INSERT INTO upsert_demo VALUES ('2020-11-11', 1, 10, NULL)
ON CONFLICT (ts, tagid)
DO UPDATE SET
c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);
INSERT INTO upsert_demo VALUES ('2020-11-11', 1, NULL, 20.1)
ON CONFLICT (ts, tagid)
DO UPDATE SET
c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);
如上两条SQL语句里都包含了唯一索引列:ts和tagid,并且值相同。SQL1只包含了c1列,c2列为空;SQL2只包含了c2列,c1列为空。
ON CONFLICT子句指定了唯一索引包含的列,用来判断数据行是INSERT还是UPDATE。
c1 = coalesce(EXCLUDED.c1, upsert_demo.c1)的含义是取EXCLUDED.c1和upsert_demo.c1中,第一个不为NULL的值。EXCLUDED.c1是原行的值,upsert_demo.c1是新插入的值。即如果原数据行存在,且列值不为空则使用原值;否则使用新插入的值。
通过这种方式,实现了相同设备、相同时间的指标数据合并。
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+------
2020-11-11 00:00:00 | 1 | 10 | 20.1
(1 row)
查询upsert_demo可以看到,c1和c2的数据是期望的结果。
通过如上演示可以看到,使用UPSERT语义,可以通过单条SQL实现根据数据是否存在来选择是插入还是更新,大大简化了开发人员的工作。
MatrixGate方式
相比使用SQL方式来接入,MatrixGate作为MatrixDB高性能数据接入工具,在4.2版本中也加入了对UPSERT语义的支持,所以在生产环境中,更加推荐用户选择使用MatrixGate方式接入,性能可以提升百倍。
下面演示如何使用MatrixGate的UPSERT语义:
1. 准备数据文件
文件1: /tmp/upsert_demo1.dat
ts|tagid|c1|c2
2020-11-11|1|10|
文件2: /tmp/upsert_demo2.dat
ts|tagid|c1|c2
2020-11-11|1||20.1
2020-11-11|2||100.5
2020-11-11|2|200|
2. 执行Gate
目标数据库假设为 test,端口为 5432
重点在--upsert-key参数
载入第一个文件(为了验证结果,载入前已将表中原数据清空)
tail -n +2 /tmp/upsert_demo1.dat | mxgated --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
结果
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+----
2020-11-11 00:00:00 | 1 | 10 |
(1 row)
载入第二个文件
tail -n +2 /tmp/upsert_demo2.dat | mxgated --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
结果
test=# select * from upsert_demo;
ts | tagid | c1 | c2
---------------------+-------+-----+-------
2020-11-11 00:00:00 | 1 | 10 | 20.1
2020-11-11 00:00:00 | 2 | 200 | 100.5
(2 rows)
从结果可以看到,ts和tagid相同的行数据,进行了合并。
3. Q&A
- UNIQUE约束与--upsert-key是否可以不完全相同,而是子集或者超集的关系?
- 不可以,必须完全一致
- UNIQUE约束里的column顺序是否需要与--upsert-key的声明相同
- 不需要,顺序无关
- UNIQUE约束是否可以使用函数或表达式?例如 time_bucket(colA, '1 min')
- PG12不支持,相应的我们也不支持
- 指标列是否可以加NOT NULL约束?
- 如果某列上有NOT NULL,则每批数据都必须包含非空的值,否则会引起错误。因此在使用UPSERT功能时,指标列推荐不加NOT NULL
- --upsert-key 是否可以是 serial (auto-increment)类型
- 不能,mxgate会检查并报错
- --upsert-key 是否可以同时是 --exclude-columns
- 不可以,mxgate会检查并报错
- --upsert-key 上如果没有定义 UNIQUE 约束,行为是什么样的?
- mxgate会检查并报错,不会有数据入库
总结
通过如上演示可以看到,UPSERT语义对于时序数据指标分批发送的场景非常有用,会大大的减少开发人员的负担,而在MatrixDB 4.2 版本中MatrixGate高性能数据接入工具也支持了该特性,欢迎大家下载使用。