流计算场景实践

本文档旨在通过提供基础用例方便用户快速上手 YMatrix 数据库流计算的部分功能。


流计算是一种对实时产生的数据进行快速分析、连续处理的数据处理技术。在 YMatrix 数据库中,你可以使用 SQL 去快速创建属于你自己的数据流。它除了支持插入、过滤、修正和填充等操作外,还支持扩维、聚集、级联、分叉、合并等实时流计算处理操作,全面提升了数仓系统的实时性,降低了系统的复杂度。运用 YMatrix 库内流计算你可以实现秒级、实时和增量的数据结果刷新。 当前,流计算拥有着非常广泛的应用场景,例如:

  • 在金融领域,流计算可以用于多维数据分析、资金流实时监控和投资风险分析等;
  • 在制造领域,流计算可以用于实时监控与预警、预测性维护和质量控制等;
  • 在交通领域,流计算可以用于智能交通管理、车辆轨迹分析和交通流量预测等。

除此之外,流计算在军事、仿真、电商、供应链和物联网等各个领域也存在着大量的应用。


用例 1 :商品数据快速扩维

“扩维计算”通常指通过关联其他表中的列或属性来扩展现有数据表结构生产大宽表的过程。YMatrix 的流计算功能支持对数据进行实时的扩维计算。我们将通过商场业务规划的例子来进一步使用该功能。通过每个顾客的商品购买信息去扩充业务种类的详细信息以得到每条业务线中各类商品的总销售情况,以进一步分析和优化商场各业务流程。


扩维前后的表结构

bill_type 为商场业务种类表;

字段 含义
id 数据编号
order_id 商品编号
type 商品种类
ts 账单时间

trade_info 为顾客交易信息表;

字段 含义
id 交易商品编号
data 商品详细信息

stream_bill_type_detail 为扩维后的商场业务种类表。

字段 含义
id 数据编号
type 商品种类
ts 账单时间
order_id 商品编号
order_data 商品详细信息

  1. 第一步,让我们创建表 bill_type 用于存储商场业务种类的信息。
CREATE TABLE bill_type (
   order_id int,
   type text,
   ts timestamp
 );
  1. 第二步,让我们创建表 trade_info,存储不同顾客购买商品的详细信息。
CREATE TABLE trade_info (
    id int, 
    data text
 );
  1. 第三步,让我们创建流 stream_bill_type_detail,通过顾客交易信息对业务种类信息进行数据扩维,用来展示业务种类交易信息的详细情况。当接收到新数据后,流 `stream_bill_type_detail
CREATE STREAM stream_bill_type_detail(id, type, ts, order_id, order_data)
AS (
   SELECT
      bill_type.order_id,
      bill_type.type,
      ts, 
      bill_type.order_id,
      trade_info.data AS order_data 
   FROM STREAMING ALL bill_type
   INNER JOIN trade_info 
       ON bill_type.order_id = trade_info.id
) PRIMARY KEY (id);
  1. 第四步,分析和查询数据。 在这个例子中,我们首先查询表 stream_bill_type_detail 中的数据,然后我们在 bill_type 表中插入一条新的数据,再次去查询表 stream_bill_type_detail中的数据。它将为我们提供最新的数据结果。
  • 插入数据
INSERT INTO bill_type 
VALUES (
    1,
    'fruit',
    current_timestamp
);

INSERT INTO trade_info 
VALUES (
    1,
    'apple'
);

INSERT INTO trade_info 
VALUES (
    2,
    'cola'
);
  • 查询当前结果
SELECT * FROM stream_bill_type_detail;

 id | type  |             ts             | order_id | order_data
----+-------+----------------------------+----------+------------
  1 | fruit | 2024-08-01 15:30:55.552471 |        1 | apple
(1 row)  
  • 插入新数据
INSERT INTO bill_type 
VALUES (
    2,
    'drink',
    current_timestamp
);
  • 查询最新结果
SELECT * FROM stream_bill_type_detail;

 id | type  |             ts             | order_id | order_data
----+-------+----------------------------+----------+------------
  1 | fruit | 2024-08-01 15:30:55.552471 |        1 | apple
  2 | drink | 2024-08-01 15:31:21.673873 |        2 | cola
(2 rows)
  1. 若我们需要更新表 bill_type 中的数据,可使用 UPDATE 语句进行操作。
UPDATE bill_type SET type = 'drinks' WHERE order_id = 2;
  • 查询最新结果
 SELECT * FROM stream_bill_type_detail;

 id |  type  |             ts             | order_id | order_data
----+--------+----------------------------+----------+------------
  1 | fruit  | 2024-08-01 15:50:23.117737 |        1 | apple
  2 | drinks | 2024-08-01 15:50:35.115252 |        2 | cola
(2 rows)
  1. 若我们需要删除表bill_type 中的数据,可使用 DELETE 语句进行操作。
DELETE FROM bill_type WHERE order_id = 2;
  • 查询最新结果
SELECT * FROM stream_bill_type_detail;

 id | type  |             ts             | order_id | order_data
----+-------+----------------------------+----------+------------
  1 | fruit | 2024-08-01 15:50:23.117737 |        1 | apple
(1 row)

用例 2 :制造数据多级聚集

“聚集计算”通常指的是对一组数据进行汇总和计算以生成统计信息,如总和、平均值、最大值、最小值等。YMatrix 数据库流计算支持对新增、更新和删除的数据的实时聚合计算。我们将通过智能制造实时数据监控的例子来进一步使用该功能。通过对制造数据流实时的聚合及时掌握月度和年度的产品生产量数据信息。


聚集计算过程的表结构

product_detail为产品生产量信息表;

字段 含义
id 数据编号
category 产品种类
value 产品生产的数量
ts 时间

stream_agg_month为产量的月度汇总信息;

字段 含义
category 产品种类
y
m
ym 年月
month_sum 月产品生产总数
month_cut 月产品数据条数

stream_agg_year为产量的年度汇总信息。

字段 含义
category 产品种类
y
year_sum 年度累计产品生产数量
year_cut 年度累计产品数据条数

  1. 第一步,让我们创建一个用于存储产品生产量信息的表 product_detail 并插入数据。
-- 创建产品生产信息表
CREATE TABLE product_detail (
    id        bigserial,
    category int,
    value     bigint,
    ts        timestamp
) DISTRIBUTED BY (id);

-- 插入数据
INSERT INTO product_detail(category, value, ts) VALUES
   (1002, 59, '2023-12-12 03:44:05'),
   (1001, 15, '2024-01-02 11:22:33'),
   (1001, 20, '2024-01-03 22:33:44'),
   (1002, 34, '2024-01-04 01:02:03'),
   (1001, 27, '2024-02-11 02:03:04'),
   (1002, 57, '2024-02-12 03:04:05');
  1. 第二步,创建流 stream_agg_month,用于产品月度生产量的聚集操作。创建流 stream_agg_year ,用于产品年度生产量的聚集操作。当接收到新数据后,流 stream_agg_monthstream_agg_year 自动执行聚集操作,更新流表中结果为最新。
--创建流 stream_agg_month,月度产品生产量
CREATE STREAM stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
   SELECT
     category,
     extract(year FROM date_trunc('year', ts)::date),
     extract(month FROM date_trunc('month', ts)::date),
     date_trunc('month', ts)::date,
     sum(value),
     count(value)
   FROM STREAMING ALL product_detail
   GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组
 )
 DISTRIBUTED BY (category, y, m);

--创建流 stream_agg_year,年度产品生产量
CREATE STREAM stream_agg_year (category, year, year_sum, year_cnt) AS (
   SELECT
     stream_agg_month.category,
     stream_agg_month.y,
     sum(stream_agg_month.month_sum),
     sum(stream_agg_month.month_cnt)
   FROM STREAMING ALL stream_agg_month
   GROUP BY 1, 2 --按照产品种类,年进行分组
 )
 DISTRIBUTED BY (category, year);
  1. 第三步,分析和查询数据结果
  • 我们可以先查询当前表 product_detail 中的产品生产量信息。
-- 按照产品种类,时间进行排序
SELECT * FROM product_detail ORDER BY 2,4;
  • 然后查询表 stream_agg_month 和表 stream_agg_year,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。
--查询产品月度生产量信息,按种类,年,月排序
SELECT * FROM stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt |           partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 | \x000000000000000200010000000000000023
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 | \x00000000000000010001000000000000001b
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 | \x00000000000000010001000000000000003b
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 | \x000000000000000100010000000000000022
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 | \x000000000000000100010000000000000039
(5 rows)

--查询产品年度生产量信息,按种类,年排序
SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
  category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |       62 |        3
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)
  • 让我们再向表 product_detail 中新增一条产品生产量数据,流 stream_agg_month 和 `stream_agg_year 会对新增的数据进行实时连续聚集。
INSERT INTO product_detail VALUES(7,1001,30,'2024-04-04 01:23:44');
  • 再次查询表 stream_agg_month 和表 `stream_agg_year,表中显示最新的聚集数据结果。
SELECT * FROM stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt |           partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 | \x000000000000000200010000000000000023
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 | \x00000000000000010001000000000000001b
     1001 | 2024 |  4 | 2024-04-01 |        30 |         1 | \x00000000000000010001000000000000001e
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 | \x00000000000000010001000000000000003b
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 | \x000000000000000100010000000000000022
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 | \x000000000000000100010000000000000039
(6 rows)

SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
  category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |       92 |        4
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)
  1. 若我们需要更新产品生产量数据表 product_detail 时,则可使用 UPDATE 语句进行操作。
UPDATE product_detail SET value = 100 WHERE id = 7;
  • 然后查询流表 stream_agg_month 和表 stream_agg_year,聚集结果已更新。
SELECT * FROM stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt |           partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 | \x000000000000000200010000000000000023
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 | \x00000000000000010001000000000000001b
     1001 | 2024 |  4 | 2024-04-01 |       100 |         1 | \x000000000000000100010000000000000064
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 | \x00000000000000010001000000000000003b
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 | \x000000000000000100010000000000000022
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 | \x000000000000000100010000000000000039
(6 rows)

SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;

 category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |      162 |        4
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)
  1. 若我们需要删除产品生产量数据表 product_detail 的数据时,则可使用 DELETE 语句进行操作。
DELETE FROM product_detail WHERE id = 7;
  • 然后查询流表 stream_agg_month 和表 `stream_agg_year,聚集结果已更新。

-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。
DELECT * FROM stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt |           partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 | \x000000000000000200010000000000000023
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 | \x00000000000000010001000000000000001b
     1001 | 2024 |  4 | 2024-04-01 |         0 |         0 | \x00000000000000000000000000000000
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 | \x00000000000000010001000000000000003b
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 | \x000000000000000100010000000000000022
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 | \x000000000000000100010000000000000039
(6 rows)

SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
 category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |       62 |        3
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)

用例 3 :交通数据双流关联

“双流 JOIN 计算”通常指的是在流处理中,对两个实时数据流进行连接(JOIN)操作,以便能够根据某些共同的键将来自两个不同流的数据项组合在一起进行分析或进一步的处理。YMatrix 数据库流计算支持对新增数据的增量 JOIN 计算。我们将通过路况实时监测的例子来进一步使用该功能,体验双数据流实时增量 JOIN 的效果。通过对车流量和交通事故数量的关联可以通过预先设定的预警阀值快速判断当前道路情况,实现及时的路况预警和事故处理。


双流 JOIN 前后的表结构

traffic_flow 为车流量数据表;

字段 含义
id 道路编号
road_n 车流量数据信息

traffic_event 为交通事故数据表;

字段 含义
id 道路编号
traf_n 交通事故发生的数量

stream_trafficinfo_total 为双流连接后的交通数据表。

字段 含义
id 道路编号
road_n 车流量数据信息
traf_n 交通事故发生的数量

  1. 第一步,让我们创建表 traffic_flow 和表 traffic_event 分别存储道路车流量的数据流与交通事故的数据流。
CREATE TABLE traffic_flow (
    id int, 
    road_n int
) 
DISTRIBUTED BY (id);
CREATE INDEX ON traffic_flow (id);

CREATE TABLE traffic_event (
    id int, 
    traf_n int
) 
DISTRIBUTED BY (id);
CREATE INDEX ON traffic_event (id);
  1. 第二步,创建流stream_trafficinfo_total,其被定义为对表 traffic_flow 和表 traffic_event 进行流式 JOIN 计算,即当道路车流量的数据流或交通事故的数据流发生变化时都会实时更新至流表的结果中。
CREATE STREAM stream_trafficinfo_total(id, road_n, traf_n) 
AS (
    SELECT 
        traffic_flow.id, 
        traffic_flow.road_n, 
        traffic_event.traf_n
    FROM STREAMING ALL traffic_flow
    INNER JOIN STREAMING ALL traffic_event 
        ON traffic_flow.id = traffic_event.id
) PRIMARY KEY (id);
  1. 第三步,分析并查询数据结果,通过分析表 stream_trafficinfo_total 的数据来判断交通流量模式和预测交通拥堵情况,当道路中的车流量达到某个阀值或发生交通事故时,及时预警。
  • 我们首先在表 traffic_flow 和表 traffic_event 中各插入一条交通记录。
INSERT INTO traffic_flow VALUES (1, 80);

INSERT INTO traffic_event VALUES (2, 3);
  • 然后查询流表 stream_trafficinfo_total,由于已有数据不适用双流 JOIN 操作,则表 stream_trafficinfo_total 中暂时未查询到结果数据。
SELECT * FROM stream_trafficinfo_total;

 id | road_n | traf_n
----+--------+--------
(0 rows)
  • 我们再次向表 traffic_flow 和表 traffic_event 中各新增 2 条交通记录。
INSERT INTO traffic_flow VALUES (2, NULL), (3, 100);

INSERT INTO traffic_event VALUES (1, 5), (3, NULL);
  • 然后查询流表 stream_trafficinfo_total。流 stream_trafficinfo_total 对新增的数据进行增量计算,显示最新的双流 JOIN 结果。
SELECT * FROM stream_trafficinfo_total;
 id | road_n | traf_n
----+--------+--------
  2 |        |      3
  1 |     80 |      5
  3 |    100 |
(3 rows)
  1. 若我们需要修改表 traffic_flowtraffic_event 中的数据,则可使用 UPDATE语句进行操作。
UPDATE traffic_flow SET road_n = 101 WHERE id=2;
  • 然后查询流表 stream_trafficinfo_total。流 stream_trafficinfo_total 对修改后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM stream_trafficinfo_total;

 id | road_n | traf_n
----+--------+--------
  1 |     80 |      5
  3 |    100 |
  2 |    101 |      3
(3 rows)
  1. 若我们需要删除表 traffic_flow 或表 traffic_event 中的数据,则可使用 DELETE 语句进行操作。
DELETE FROM traffic_event WHERE id = 2;
  • 然后查询流表stream_trafficinfo_total。流stream_trafficinfo_total对删除后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM stream_trafficinfo_total;

 id | road_n  | traf_n
----+---------+--------
  1 |   80    |   5
  3 |   100   |
(2 rows)