400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
YMatrix 文档
关于 YMatrix
标准集群部署
数据写入
数据迁移
数据查询
运维监控
参考指南
工具指南
数据类型
存储引擎
执行引擎
系统配置参数
SQL 参考
常见问题(FAQ)
新架构 FAQ
集群部署 FAQ
SQL 查询 FAQ
MatrixGate FAQ
运维 FAQ
监控告警 FAQ
PXF FAQ
PLPython FAQ
性能 FAQ
本文档介绍了 YMatrix 对于数据分批合并场景的解决方案:在不同存储引擎(HEAP、MARS2)下利用不同的数据分批合并方式实现 UPSERT 功能。
首先以一个车联网宽表场景 A 为例。
此场景中,我们设计的宽表模型(不含指标类型)如下:
此时,车端数据采集系统正在采集同时间戳(同时产生)、同一辆车中的数据,并分批发送给 YMatrix 数据库:
可以看到,我们构造的宽表模型想要以 车 作为存储设备单位来进行数据存储、计算与分析,而车端的数据采集系统则是以 传感器 作为采集设备单位进行数据采集和上传的。因此,由车端传输到 YMatrix 的数据会以传感器为单位成一定批次写入数据库。
在此条件下便形成了 YMatrix 中的数据分批合并场景。
示例场景 A 中有(至少)四批上传数据需要在进行数据库之后进行合并处理。如果某批数据有重复上传的情况,YMatrix 会以非 NULL 值覆盖 NULL 值,新值覆盖旧值的方式更新此批数据:
合并处理后,我们再查询这些数据,看到的就是已经合并完成的 一行 而不是几行数据了。
实际情况中,在数据写入阶段还可能出现异频、乱序、延迟等现象,但只有分批场景需要数据库对数据进行合并处理,因此其他几种不再赘述。
YMatrix 认为,UPSERT 是 INSERT 与 UPDATE 功能的组合。
当一条新数据即将存储入库:
注意!
上述“指定的行”指的是目前数据库中已存在的, MARS2 表中创建 mars2_btree 索引指定的排序键,或 HEAP 表中创建唯一索引 / 约束指定的键 与即将入库的新行相同的行。
在 YMatrix 中,UPSERT 并非一个 SQL 关键词,而是一个集 INSERT 与 UPDATE 功能于一体的操作,可以通过以下方法来实现:
uniquemode=true
。uniquemode=true
。INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
语句。YMatrix 中 UPSERT 在不同存储引擎下的用法适用于不同的业务场景:
存储引擎 | UPSERT 用法 | 适用场景 |
MARS3 | 创建 MARS3 表时指定 uniquemode=true | 时序场景最佳实践:一个批次中的数据除在写入 MARS3 时会尽可能进行合并处理,以减小实际落盘数据的大小,剩下少量数据则在查询时实时合并,以直接展示合并后的查询结果。这种方式有效地提高了数据的写入与查询性能;OLAP 与 OLTP 场景无通用最佳实践,如需数据分批合并则推荐开启 |
MARS2 | 直接指定 MARS2 表索引的 uniquemode=true | 时序场景最佳实践:一个批次中的数据除在写入 MARS3 时会尽可能进行合并处理,以减小实际落盘数据的大小,剩下少量数据则在查询时实时合并,以直接展示合并后的查询结果。这种方式有效地提高了数据的写入与查询性能 |
HEAP | 在 HEAP 表下使用 mxgate | 时序场景推荐使用。mxgate 是 YMatrix 的高性能写入工具,具有优越的写入性能。此方法需在指定列(通常为设备唯一标识和时间戳)创建唯一约束 / 索引 |
在 HEAP 表下使用 ON CONFLICT SQL 子句 | 小批量 UPSERT 操作推荐使用。由于这种方法是在写入时即对一批次数据进行物理上的合并处理,会影响一定程度的写入性能,所以如需大规模数据写入,则性能不如上述两种。此方法同样需创建唯一约束 / 索引 |
在此给出示例:
首先,安装 matrixts
扩展。
=# CREATE EXTENSION matrixts;
然后,创建 MARS3 测试表。
=# CREATE TABLE v2x_mars3 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);
注意!
如开启 Unique Mode,则 ORDER BY 子句的第一个字段在定义时需要添加 NOT NULL 约束。
插入同一批次的四条数据。
=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
查询数据,可见 tag1
设备在 2022-07-19 00:00:00
批次的数据已合并显示为一行,且新值覆盖了旧值。
=# SELECT * FROM v2x_mars3;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power
------------------------+--------+-----------+----------+-------+------------------+-------------------+-------
2022-07-19 00:00:00+00 | tag1 | -32.3 | 45 | 70.2 | t | f | 52
(1 row)
在此给出示例:
首先,安装 matrixts
扩展。
=# CREATE EXTENSION matrixts;
然后,创建 MARS2 测试表。
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
创建 uniquemode=true
的 mars2_btree 索引。
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
插入同一批次的四条数据。
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
查询数据,可见 tag1
设备在 2022-07-19 00:00:00
批次的数据已合并显示为一行,且新值覆盖了旧值。
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
如果你的业务场景是时序场景,同时使用的存储引擎为 HEAP,那么我们推荐通过 mxgate 高速写入工具实现 UPSERT。
要实现 UPSERT 功能,需要在指定字段创建唯一约束 / 索引。
在 mxgate 中,通过指定 --upsert-key
可以用新值覆盖旧值,实现 UPSERT 操作;--deduplicate-key
则保持旧值,丢弃新值,实现去重功能。
--upsert-key
用法注意!
此用法等价于下文 SQL 语句INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
。
示例如下:
创建 HEAP 测试表。
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
用于测试的数据 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
用于测试的数据 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
加载数据 1,设置 --upsert-key
为 tag_id
和 ts
。
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
查询结果 1。
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
加载数据 2,设置 --upsert-key
为 tag_id
和 ts
。
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
查询结果 2,可以看到原本数据 1 中 tag1
的 speed
被数据 2 中相应新值取代并合并为一行。
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-key
用法注意!
此用法等价于下文 SQL 语句INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING
。
示例如下:
创建测试表。
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
用于测试的数据 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
用于测试的数据 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
添加完成后,点击 esc
键退出文件,输入 :wq
保存退出。
加载数据 1,设置 --deduplicate-key
为 tag_id
和 ts
。
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
查询结果 1。
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
加载数据 2,设置 --deduplicate-key
为 tag_id
和 ts
。
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
查询结果 2,可以看到数据 2 中的 tag1
的 speed
、left_turn_signal
、right_turn_signal
、power
信息均被丢弃,保留了数据 1 中的相关旧值(或空值)。
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
注意!
此用法仅限 HEAP 表使用。
示例如下:
创建 HEAP 测试表。
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
创建唯一索引,并指定键值为 (tag_id,ts)
。
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
插入测试数据。
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
查看测试数据,可以看到插入的三条 tag1
设备在 2022-07-19 00:00:00+00
批次的数据已合并为一行。
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
注意!
INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
语句在 SQL 参考 - INSERT 一节也有介绍。