数据分批合并场景(UPSERT)

本文档介绍了 YMatrix 对于数据分批合并场景的解决方案:在不同存储引擎(HEAP、MARS2)下利用不同的数据分批合并方式实现 UPSERT 功能。

1 数据分批合并场景是什么?

首先以一个车联网宽表场景 A 为例。 此场景中,我们设计的宽表模型(不含指标类型)如下:

此时,车端数据采集系统正在采集同时间戳(同时产生)同一辆车中的数据,并分批发送给 YMatrix 数据库:

可以看到,我们构造的宽表模型想要以 作为存储设备单位来进行数据存储、计算与分析,而车端的数据采集系统则是以 传感器 作为采集设备单位进行数据采集和上传的。因此,由车端传输到 YMatrix 的数据会以传感器为单位成一定批次写入数据库。
在此条件下便形成了 YMatrix 中的数据分批合并场景。

示例场景 A 中有(至少)四批上传数据需要在进行数据库之后进行合并处理。如果某批数据有重复上传的情况,YMatrix 会以非 NULL 值覆盖 NULL 值,新值覆盖旧值的方式更新此批数据:

合并处理后,我们再查询这些数据,看到的就是已经合并完成的 一行 而不是几行数据了。

实际情况中,在数据写入阶段还可能出现异频、乱序、延迟等现象,但只有分批场景需要数据库对数据进行合并处理,因此其他几种不再赘述。

2 UPSERT 是什么?

YMatrix 认为,UPSERT 是 INSERT 与 UPDATE 功能的组合。

当一条新数据即将存储入库:

  • 如果指定的行已经存在于表中,则对其进行更新操作。
  • 如果指定的行不存在于表中,则插入新行。

注意!
上述“指定的行”指的是目前数据库中已存在的, MARS2 表中创建 mars2_btree 索引指定的排序键,或 HEAP 表中创建唯一索引 / 约束指定的键 与即将入库的新行相同的行。

3 UPSERT 功能在不同存储引擎下的实现

在 YMatrix 中,UPSERT 并非一个 SQL 关键词,而是一个集 INSERT 与 UPDATE 功能于一体的操作,可以通过以下方法来实现:

  • 创建 MARS3 表时指定 uniquemode=true
  • 直接指定 MARS2 表索引的 uniquemode=true
  • 在 HEAP 表下使用 mxgate 或 INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE 语句。

YMatrix 中 UPSERT 在不同存储引擎下的用法适用于不同的业务场景:

存储引擎UPSERT 用法适用场景
MARS3创建 MARS3 表时指定 uniquemode=true时序场景最佳实践:一个批次中的数据除在写入 MARS3 时会尽可能进行合并处理,以减小实际落盘数据的大小,剩下少量数据则在查询时实时合并,以直接展示合并后的查询结果。这种方式有效地提高了数据的写入与查询性能;OLAP 与 OLTP 场景无通用最佳实践,如需数据分批合并则推荐开启
MARS2直接指定 MARS2 表索引的 uniquemode=true时序场景最佳实践:一个批次中的数据除在写入 MARS3 时会尽可能进行合并处理,以减小实际落盘数据的大小,剩下少量数据则在查询时实时合并,以直接展示合并后的查询结果。这种方式有效地提高了数据的写入与查询性能
HEAP在 HEAP 表下使用 mxgate时序场景推荐使用。mxgate 是 YMatrix 的高性能写入工具,具有优越的写入性能。此方法需在指定列(通常为设备唯一标识和时间戳)创建唯一约束 / 索引
在 HEAP 表下使用 ON CONFLICT SQL 子句小批量 UPSERT 操作推荐使用。由于这种方法是在写入时即对一批次数据进行物理上的合并处理,会影响一定程度的写入性能,所以如需大规模数据写入,则性能不如上述两种。此方法同样需创建唯一约束 / 索引

3.1 MARS3 存储引擎

在此给出示例:

首先,安装 matrixts 扩展。

=# CREATE EXTENSION matrixts;

然后,创建 MARS3 测试表。

=# CREATE TABLE v2x_mars3 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);

注意!
如开启 Unique Mode,则 ORDER BY 子句的第一个字段在定义时需要添加 NOT NULL 约束。

插入同一批次的四条数据。

=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

查询数据,可见 tag1 设备在 2022-07-19 00:00:00 批次的数据已合并显示为一行,且新值覆盖了旧值。

=# SELECT * FROM v2x_mars3;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power 
------------------------+--------+-----------+----------+-------+------------------+-------------------+-------
 2022-07-19 00:00:00+00 | tag1   |     -32.3 |       45 |  70.2 | t                | f                 |    52
(1 row)

3.2 MARS2 存储引擎

在此给出示例:

首先,安装 matrixts 扩展。

=# CREATE EXTENSION matrixts;

然后,创建 MARS2 测试表。

=# CREATE TABLE v2x_mars2 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS2
DISTRIBUTED BY (tag_id);

创建 uniquemode=true 的 mars2_btree 索引。

=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);

插入同一批次的四条数据。

=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

查询数据,可见 tag1 设备在 2022-07-19 00:00:00 批次的数据已合并显示为一行,且新值覆盖了旧值。

=# SELECT * FROM v2x_mars2;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 | t               | f                |
    52
(1 row)
Time: 4.172 ms

3.3 HEAP 存储引擎

3.3.1 通过 mxgate 实现

如果你的业务场景是时序场景,同时使用的存储引擎为 HEAP,那么我们推荐通过 mxgate 高速写入工具实现 UPSERT。
要实现 UPSERT 功能,需要在指定字段创建唯一约束 / 索引

在 mxgate 中,通过指定 --upsert-key 可以用新值覆盖旧值,实现 UPSERT 操作;--deduplicate-key 则保持旧值,丢弃新值,实现去重功能。

  1. --upsert-key 用法

注意!
此用法等价于下文 SQL 语句 INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE

示例如下:
创建 HEAP 测试表。

=# CREATE TABLE v2x_heap_upsert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

用于测试的数据 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

添加完成后,点击 esc 键退出文件,输入 :wq 保存退出。

用于测试的数据 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

添加完成后,点击 esc 键退出文件,输入 :wq 保存退出。

加载数据 1,设置 --upsert-keytag_idts

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

查询结果 1。

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                 |                  |
(1 row)
Time: 18.049 ms

加载数据 2,设置 --upsert-keytag_idts

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

查询结果 2,可以看到原本数据 1 中 tag1speed 被数据 2 中相应新值取代并合并为一行。

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |    80 | f                | f
|    70
(2 rows)
Time: 19.652 ms
  1. --deduplicate-key 用法

注意!
此用法等价于下文 SQL 语句 INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING

示例如下:
创建测试表。

=# CREATE TABLE v2x_heap_dedu (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

用于测试的数据 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

添加完成后,点击 esc 键退出文件,输入 :wq 保存退出。

用于测试的数据 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

添加完成后,点击 esc 键退出文件,输入 :wq 保存退出。

加载数据 1,设置 --deduplicate-keytag_idts

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

查询结果 1。

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
(1 row)
Time: 18.010 ms

加载数据 2,设置 --deduplicate-keytag_idts

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

查询结果 2,可以看到数据 2 中的 tag1speedleft_turn_signalright_turn_signalpower 信息均被丢弃,保留了数据 1 中的相关旧值(或空值)。

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
(2 rows)
Time: 12.881 ms

3.3.2 通过 INSERT 语句实现

注意!
此用法仅限 HEAP 表使用。

示例如下:
创建 HEAP 测试表。

=# CREATE TABLE v2x_heap_insert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) DISTRIBUTED BY (tag_id);

创建唯一索引,并指定键值为 (tag_id,ts)

=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);

插入测试数据。

=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET  longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET  speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET  power = excluded.power;

查看测试数据,可以看到插入的三条 tag1 设备在 2022-07-19 00:00:00+00 批次的数据已合并为一行。

=# SELECT * FROM v2x_heap_insert;
           ts           | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
 | power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                  |
 |    50
(1 row)
Time: 16.340 ms

注意!
INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE 语句在 SQL 参考 - INSERT 一节也有介绍。