数据分批合并场景(UPSERT)
本文档介绍了 YMatrix 对于数据分批合并场景的解决方案:在不同存储引擎(HEAP、MARS2)下利用不同的数据分批合并方式实现 UPSERT 功能。
1 数据分批合并场景是什么?
首先以一个车联网宽表场景 A 为例。 此场景中,我们设计的宽表模型(不含指标类型)如下:
此时,车端数据采集系统正在采集同时间戳(同时产生)、同一辆车中的数据,并分批发送给 YMatrix 数据库:
可以看到,我们构造的宽表模型想要以 车 作为存储设备单位来进行数据存储、计算与分析,而车端的数据采集系统则是以 传感器 作为采集设备单位进行数据采集和上传的。因此,由车端传输到 YMatrix 的数据会以传感器为单位成一定批次写入数据库。
在此条件下便形成了 YMatrix 中的数据分批合并场景。
示例场景 A 中有(至少)四批上传数据需要在进行数据库之后进行合并处理。如果某批数据有重复上传的情况,YMatrix 会以非 NULL 值覆盖 NULL 值,新值覆盖旧值的方式更新此批数据:
合并处理后,我们再查询这些数据,看到的就是已经合并完成的 一行 而不是几行数据了。
实际情况中,在数据写入阶段还可能出现异频、乱序、延迟等现象,但只有分批场景需要数据库对数据进行合并处理,因此其他几种不再赘述。
2 UPSERT 是什么?
YMatrix 认为,UPSERT 是 INSERT 与 UPDATE 功能的组合。
当一条新数据即将存储入库:
- 如果指定的行已经存在于表中,则对其进行更新操作。
- 如果指定的行不存在于表中,则插入新行。
注意!
上述“指定的行”指的是目前数据库中已存在的, MARS2 表中创建 mars2_btree 索引指定的排序键,或 HEAP 表中创建唯一索引 / 约束指定的键 与即将入库的新行相同的行。
3 UPSERT 功能在不同存储引擎下的实现
在 YMatrix 中,UPSERT 并非一个 SQL 关键词,而是一个集 INSERT 与 UPDATE 功能于一体的操作,可以通过以下方法来实现:
- 直接指定 MARS2 表索引的
uniquemode=true
。 - 在 HEAP 表下使用 mxgate 或
INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
语句。
YMatrix 中 UPSERT 在不同存储引擎下的用法适用于不同的业务场景:
存储引擎 | UPSERT 用法 | 适用场景 |
MARS2 | 直接指定 MARS2 表索引的 `uniquemode=true` | 时序场景最佳实践。一个批次中的数据不在写入时进行合并处理,而是在查询时由 MARS2 在后台自动完成合并,直接展示合并后的查询结果。这种方式可以大幅提高写入性能,配合 MARS2 表良好的压缩性能,可以成为大规模分批时序数据写入和存储的最佳实践 |
HEAP | 在 HEAP 表下使用 mxgate | 时序场景推荐使用。mxgate 是 YMatrix 的高性能写入工具,具有优越的写入性能。此方法需在指定列(通常为设备唯一标识和时间戳)创建唯一约束 / 索引 |
在 HEAP 表下使用 `ON CONFLICT` SQL 子句 | 小批量 UPSERT 操作推荐使用。由于这种方法是在写入时即对一批次数据进行物理上的合并处理,会影响一定程度的写入性能,所以如需大规模数据写入,则性能不如上述两种。此方法同样需创建唯一约束 / 索引 |
3.1 MARS2 存储引擎
在此给出示例:
首先,安装 matrixts
扩展。
=# CREATE EXTENSION matrixts;
然后,创建 MARS2 测试表。
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
创建 uniquemode=true
的 mars2_btree 索引。
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
插入同一批次的四条数据。
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
查询数据,可见 tag1
设备在 2022-07-19 00:00:00
批次的数据已合并显示为一行,且新值覆盖了旧值。
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
3.2 HEAP 存储引擎
3.2.1 通过 mxgate 实现
如果你的业务场景是时序场景,同时使用的存储引擎为 HEAP,那么我们推荐通过 mxgate 高速写入工具实现 UPSERT。
要实现 UPSERT 功能,需要在指定字段创建唯一约束 / 索引。
在 mxgate 中,通过指定 --upsert-key
可以用新值覆盖旧值,实现 UPSERT 操作;--deduplicate-key
则保持旧值,丢弃新值,实现去重功能。
--upsert-key
用法注意!
此用法等价于下文 SQL 语句INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
。
示例如下:
创建 HEAP 测试表。
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
用于测试的数据 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
用于测试的数据 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
加载数据 1,设置 --upsert-key
为 tag_id
和 ts
。
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
查询结果 1。
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
加载数据 2,设置 --upsert-key
为 tag_id
和 ts
。
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
查询结果 2,可以看到原本数据 1 中 tag1
的 speed
被数据 2 中相应新值取代并合并为一行。
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-key
用法注意!
此用法等价于下文 SQL 语句INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING
。
示例如下:
创建测试表。
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
用于测试的数据 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
用于测试的数据 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
加载数据 1,设置 --deduplicate-key
为 tag_id
和 ts
。
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
查询结果 1。
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
加载数据 2,设置 --deduplicate-key
为 tag_id
和 ts
。
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
查询结果 2,可以看到数据 2 中的 tag1
的 speed
、left_turn_signal
、right_turn_signal
、power
信息均被丢弃,保留了数据 1 中的相关旧值(或空值)。
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
3.2.2 通过 INSERT 语句实现
注意!
此用法仅限 HEAP 表使用。
示例如下:
创建 HEAP 测试表。
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
创建唯一索引,并指定键值为 (tag_id,ts)
。
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
插入测试数据。
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
查看测试数据,可以看到插入的三条 tag1
设备在 2022-07-19 00:00:00+00
批次的数据已合并为一行。
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
注意!
INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
语句在 SQL 参考 - INSERT 一节也有介绍。