1. 前言
MatrixDB 4.2 版本发布后,MatrixGate 推出了一个新特性 — UPSERT。
本文将详细介绍 UPSERT 语义及 MatrixGate 新特性的使用方法。
什么是UPSERT?
UPSERT 就是 UPDATE 和 INSERT 的结合。
实现一条SQL内自动插入或者更新:如果记录不存在则插入,如果记录存在则更新。
这是应用开发中最常见的场景之一,通过UPSERT语法可以大幅简化开发代码的复杂度,提升效率。
在众多场景下,都免不了使用UPSERT功能,比如:
- 设备数据不是一次性发送全部,而是分批次发送,需要按设备号和时间戳为主键进行合并
- 设备数据可能会重复发送,对于重复数据要做更新而不是重复插入
而 UPSERT 语义则直接实现了该逻辑。下面演示一下UPSERT的用法。
2. UPSERT 的使用方法
准备数据表
DROP TABLE IF EXISTS upsert_demo;
CREATE TABLE upsert_demo (
ts timestamp
, tagid int
, c1 int
, c2 float4
, UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);
注意 ⚠️
为了数据库能够使用 UPSERT 功能,记得要在表的“设备 id +时间戳”上创建 UNIQUE 约束。
使用 UPSERT 语义接入数据
SQL 方式
先来演示下,如何通过直接执行 SQL 语句的方式进行 UPSERT,对于使用 libpq 或 JDBC 等来连接数据库的用户,可以直接参考如下 SQL 语法来操作。
假设设备指标 c1 和 c2 分两个批次发送,则使用 SQL 来进行数据填充。
INSERT INTO upsert_demo VALUES ('2020-11-11', 1, 10, NULL)
ON CONFLICT (ts, tagid)
DO UPDATE SET
c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);
INSERT INTO upsert_demo VALUES ('2020-11-11', 1, NULL, 20.1)
ON CONFLICT (ts, tagid)
DO UPDATE SET
c1 = coalesce(EXCLUDED.c1, upsert_demo.c1),
c2 = coalesce(EXCLUDED.c2, upsert_demo.c2);
如上两条 SQL 语句里都包含了唯一索引列:ts 和 tagid,并且值相同。SQL1 只包含了 c1 列,c2 列为空;SQL2 只包含了 c2 列,c1 列为空。
ON CONFLICT 子句指定了唯一索引包含的列,用来判断数据行是 INSERT 还是 UPDATE。
c1=coalesce(EXCLUDED.c1,upsert_demo.c1)的含义是取 EXCLUDED.c1 和 upsert_demo.c1 中,第一个不为 NULL 的值。
EXCLUDED.c1 是原行的值,upsert_demo.c1 是新插入的值。即如果原数据行存在,且列值不为空则使用原值;否则使用新插入的值。
通过这种方式,实现了相同设备、相同时间的指标数据合并。
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+------
2020-11-11 00:00:00 | 1 | 10 | 20.1
(1 row)
查询 upsert_demo 可以看到,c1 和 c2 的数据是期望的结果。
通过如上演示可以看到,使用 UPSERT 语义,可以通过单条 SQL 实现根据数据是否存在来选择是插入还是更新,大大简化了开发人员的工作。
MatrixGate 方式
相比使用 SQL 方式来接入,MatrixGate 作为 MatrixDB 高性能数据接入工具,在 4.2 版本中也加入了对 UPSERT 语义的支持,所以在生产环境中,更加推荐用户选择使用 MatrixGate 方式接入,性能可以提升百倍。
下面演示如何使用 MatrixGate 的 UPSERT 语义:
1. 准备数据文件
文件1: /tmp/upsert_demo1.dat
ts|tagid|c1|c2
2020-11-11|1|10|
文件1: /tmp/upsert_demo2.dat
ts|tagid|c1|c2
2020-11-11|1||20.1
2020-11-11|2||100.5
2020-11-11|2|200|
2. 执行 Gate
目标数据库假设为 test,端口为 5432 重点在 --upsert-key 参数 载入第一个文件(为了验证结果,载入前已将表中原数据清空)
tail -n +2 /tmp/upsert_demo1.dat | mxgated --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
结果
test=# select * from upsert_demo ;
ts | tagid | c1 | c2
---------------------+-------+----+----
2020-11-11 00:00:00 | 1 | 10 |
(1 row)
载入第二个文件
tail -n +2 /tmp/upsert_demo2.dat | mxgated --source stdin \
--db-database test \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--target upsert_demo \
--upsert-key ts \
--upsert-key tagid
结果
test=# select * from upsert_demo;
ts | tagid | c1 | c2
---------------------+-------+-----+-------
2020-11-11 00:00:00 | 1 | 10 | 20.1
2020-11-11 00:00:00 | 2 | 200 | 100.5
(2 rows)
从结果可以看到,ts和tagid相同的行数据,进行了合并。
3. Q&A
3.总结
通过如上演示可以看到,UPSERT 语义对于时序数据指标分批发送的场景非常有用,会大大的减少开发人员的负担,而在 MatrixDB 4.2 版本中 MatrixGate 高性能数据接入工具也支持了该特性,欢迎大家下载使用。