流计算场景实践
流表只能在超级用户下创建,需授权 superuser 权限。
超级用户的创建方法可参考 CREATE_ROLE
本文档旨在通过提供基础用例方便用户快速上手 YMatrix 数据库流计算的部分功能。
流计算是一种对实时产生的数据进行快速分析、连续处理的数据处理技术。在 YMatrix 数据库中,你可以使用 SQL 去快速创建属于你自己的数据流。它除了支持插入、过滤、修正和填充等操作外,还支持扩维、聚集、级联、分叉、合并等实时流计算处理操作,全面提升了数仓系统的实时性,降低了系统的复杂度。运用 YMatrix 库内流计算你可以实现秒级、实时和增量的数据结果刷新。 当前,流计算拥有着非常广泛的应用场景,例如:
- 在金融领域,流计算可以用于多维数据分析、资金流实时监控和投资风险分析等;
- 在制造领域,流计算可以用于实时监控与预警、预测性维护和质量控制等;
- 在交通领域,流计算可以用于智能交通管理、车辆轨迹分析和交通流量预测等。
除此之外,流计算在军事、仿真、电商、供应链和物联网等各个领域也存在着大量的应用。
用例 1 :商品数据快速扩维
“扩维计算”通常指通过关联其他表中的列或属性来扩展现有数据表结构生产大宽表的过程。YMatrix 的流计算功能支持对数据进行实时的扩维计算。
我们将通过一个简单的对订单+商品信息执行扩维操作的例子。通过每个顾客的商品购买信息去扩充业务订单以得到每条业务线中各类商品的总销售情况。
表结构
-
ods_order
为订单信息:存储订单信息的事实表字段 含义 id 订单 id prod_id 商品 id ts 订单时间 -
dim_prod
为商品信息:存储商品详细信息的维度表字段 含义 id 交易商品编号 pord_name 商品名称 pord_detail 商品详情 -
dwd_order_detail
为订单详情:扩维后包含所有商品信息的订单详情表字段 含义 id 订单 id ts 账单时间 prod_id 商品 id pord_name 商品名称 pord_detail 商品详情
操作
-
第一步,让我们创建表
ods_order
和dim_prod
。CREATE TABLE ods_order ( id int, prod_id int, ts timestamp ) DISTRIBUTED BY (id); CREATE TABLE dim_prod ( id int, prod_name text, prod_detail text, ) DISTRIBUTED BY (id);
-
第二步,让我们创建流 dwd_order_detail,通过商品信息对原始交易数据进行数据扩维。当
ods_order
有新数据插入时,dwd_order_detail
会即时增量刷新,最新的、经过扩维的交易信息会自动写入。CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail) AS ( SELECT ods_order.id, ods_order.ts, ods_order.prod_id, dim_prod.prod_name, dim_prod.prod_detail FROM STREAMING ALL ods_order INNER JOIN dim_prod ON ods_order.prod_id = dim_prod.id ) PRIMARY KEY (id);
-
第三步,准备数据。
-
插入商品信息
INSERT INTO dim_prod VALUES ( 1, 'apple', 'fruit_001' ); INSERT INTO dim_prod VALUES ( 2, 'cola', 'drink_001' );
-
插入订单信息
-- 订单 1 INSERT INTO ods_order VALUES ( 1, 1, current_timestamp ); -- 订单 2 INSERT INTO ods_order VALUES ( 2, 2, current_timestamp );
我们通过流 dwd_order_detail
关联 表 ods_order
和 表 dim_prod
,一旦我们向 ods_order
中插入新数据,那么流表 dwd_order_detail
也会立即更新,展示出扩维后的新订单数据。
-
查询当前结果
SELECT * FROM dwd_order_detail; id | ts | prod_id | prod_name | prod_detail ----+----------------------------+---------+-----------+------------ 1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001 2 | 2024-08-01 15:50:35.115252 | 2 | cola | drink_001 (2 rows)
-
若我们需要更新表
dim_prod
中的数据,可使用UPDATE
语句进行操作。-- 更新 cola -> pepsi UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
-
查询最新结果
SELECT * FROM dwd_order_detail; id | ts | prod_id | prod_name | prod_detail ----+----------------------------+---------+-----------+------------ 1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001 2 | 2024-08-01 15:50:35.115252 | 2 | pepsi | drink_001 (2 rows)
-
若我们需要删除表
dim_prod
中的数据,可使用DELETE
语句进行操作。-- 删除订单2 DELETE FROM ods_order WHERE id = 2;
-
查询最新结果
SELECT * FROM dwd_order_detail; id | ts | prod_id | prod_name | prod_detail ----+----------------------------+---------+-----------+------------ 1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001 (1 row)
用例 2 :制造数据多级聚集
“聚集计算”通常指的是对一组数据进行汇总和计算以生成统计信息,如总和、平均值、最大值、最小值等。YMatrix 数据库流计算支持对新增、更新和删除的数据的实时聚合计算。
我们将通过智能制造实时数据监控的例子来进一步使用该功能。通过对制造数据流实时的聚合及时掌握月度和年度的产品生产量数据信息。
表结构
-
dwd_production
为产品生产量信息表字段 含义 id 数据编号 category 产品种类 value 产品生产的数量 ts 时间 -
dws_stream_agg_month
为产量的月度汇总信息字段 含义 category 产品种类 y 年 m 月 ym 年月 month_sum 月产品生产总数 month_cut 月产品数据条数 -
dws_stream_agg_year
为产量的年度汇总信息字段 含义 category 产品种类 y 年 year_sum 年度累计产品生产数量 year_cut 年度累计产品数据条数
操作
-
第一步,让我们创建一个用于存储产品生产量信息的表
dwd_production
并插入数据。-- 创建产品生产信息表 CREATE TABLE dwd_production ( id bigserial, category int, value bigint, ts timestamp ) DISTRIBUTED BY (id); -- 插入数据 INSERT INTO dwd_production(category, value, ts) VALUES (1002, 59, '2023-12-12 03:44:05'), (1001, 15, '2024-01-02 11:22:33'), (1001, 20, '2024-01-03 22:33:44'), (1002, 34, '2024-01-04 01:02:03'), (1001, 27, '2024-02-11 02:03:04'), (1002, 57, '2024-02-12 03:04:05');
-
第二步,创建流
dws_stream_agg_month
,用于产品月度生产量的聚集操作。创建流dws_stream_agg_year
,用于产品年度生产量的聚集操作。当接收到新数据后,流dws_stream_agg_month
和dws_stream_agg_year
自动执行聚集操作,更新流表中结果为最新。--创建流 dwd_stream_agg_month,月度产品生产量 CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS ( SELECT category, extract(year FROM date_trunc('year', ts)::date), extract(month FROM date_trunc('month', ts)::date), date_trunc('month', ts)::date, sum(value), count(value) FROM STREAMING ALL dwd_production GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组 ) DISTRIBUTED BY (category, y, m); --创建流 dwd_stream_agg_year,年度产品生产量 CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS ( SELECT dwd_stream_agg_month.category, dwd_stream_agg_month.y, sum(dwd_stream_agg_month.month_sum), sum(dwd_stream_agg_month.month_cnt) FROM STREAMING ALL dws_stream_agg_month GROUP BY 1, 2 --按照产品种类,年进行分组 ) DISTRIBUTED BY (category, year);
-
第三步,分析和查询数据结果
-
我们可以先查询当前表
dwd_production
中的产品生产量信息。-- 按照产品种类,时间进行排序 SELECT * FROM dwd_production ORDER BY 2,4;
-
然后查询表
dws_stream_agg_month
和表dws_stream_agg_year
,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。--查询产品月度生产量信息,按种类,年,月排序 SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category | y | m | ym | month_sum | month_cnt ----------+------+----+------------+-----------+----------- 1001 | 2024 | 1 | 2024-01-01 | 35 | 2 1001 | 2024 | 2 | 2024-02-01 | 27 | 1 1002 | 2023 | 12 | 2023-12-01 | 59 | 1 1002 | 2024 | 1 | 2024-01-01 | 34 | 1 1002 | 2024 | 2 | 2024-02-01 | 57 | 1 (5 rows) --查询产品年度生产量信息,按种类,年排序 SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt ----------+------+----------+---------- 1001 | 2024 | 62 | 3 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2 (3 rows)
-
让我们再向表
dwd_production
中新增一条产品生产量数据,流dws_stream_agg_month
和dws_stream_agg_year
会对新增的数据进行实时连续聚集。INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
-
再次查询表
dws_stream_agg_month
和dws_stream_agg_year
,表中显示最新的聚集数据结果。SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category | y | m | ym | month_sum | month_cnt ----------+------+----+------------+-----------+----------- 1001 | 2024 | 1 | 2024-01-01 | 35 | 2 1001 | 2024 | 2 | 2024-02-01 | 27 | 1 1001 | 2024 | 4 | 2024-04-01 | 30 | 1 1002 | 2023 | 12 | 2023-12-01 | 59 | 1 1002 | 2024 | 1 | 2024-01-01 | 34 | 1 1002 | 2024 | 2 | 2024-02-01 | 57 | 1 (6 rows) SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt ----------+------+----------+---------- 1001 | 2024 | 92 | 4 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2 (3 rows)
-
若我们需要更新产品生产量数据表
dwd_production
时,则可使用UPDATE
语句进行操作。UPDATE dwd_production SET value = 100 WHERE id = 7;
-
然后查询流表
dws_stream_agg_month
和dws_stream_agg_year
,聚集结果已更新。SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category | y | m | ym | month_sum | month_cnt ----------+------+----+------------+-----------+----------- 1001 | 2024 | 1 | 2024-01-01 | 35 | 2 1001 | 2024 | 2 | 2024-02-01 | 27 | 1 1001 | 2024 | 4 | 2024-04-01 | 100 | 1 1002 | 2023 | 12 | 2023-12-01 | 59 | 1 1002 | 2024 | 1 | 2024-01-01 | 34 | 1 1002 | 2024 | 2 | 2024-02-01 | 57 | 1 SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt ----------+------+----------+---------- 1001 | 2024 | 162 | 4 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2 (3 rows)
-
若我们需要删除产品生产量数据表
dwd_production
的数据时,则可使用DELETE
语句进行操作。DELETE FROM dwd_production WHERE id = 7;
-
然后查询流表
dws_stream_agg_month
和dws_stream_agg_year
,聚集结果已更新。-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。 DELECT * FROM dws_stream_agg_month ORDER BY 1,2,3; category | y | m | ym | month_sum | month_cnt ----------+------+----+------------+-----------+----------- 1001 | 2024 | 1 | 2024-01-01 | 35 | 2 1001 | 2024 | 2 | 2024-02-01 | 27 | 1 1001 | 2024 | 4 | 2024-04-01 | 0 | 0 1002 | 2023 | 12 | 2023-12-01 | 59 | 1 1002 | 2024 | 1 | 2024-01-01 | 34 | 1 1002 | 2024 | 2 | 2024-02-01 | 57 | 1 SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2; category | year | year_sum | year_cnt ----------+------+----------+---------- 1001 | 2024 | 62 | 3 1002 | 2023 | 59 | 1 1002 | 2024 | 91 | 2 (3 rows)
用例 3 :交通数据双流关联
“双流 JOIN 计算”通常指的是在流处理中,对两个实时数据流进行连接(JOIN)操作,以便能够根据某些共同的键将来自两个不同流的数据项组合在一起进行分析或进一步的处理。YMatrix 数据库流计算支持对新增数据的增量 JOIN 计算。我们将通过路况实时监测的例子来进一步使用该功能,体验双数据流实时增量 JOIN 的效果。
通过对车流量和交通事故数量的关联可以通过预先设定的预警阀值快速判断当前道路情况,实现及时的路况预警和事故处理。
表结构
-
dwd_traffic_flow
为车流量数据表字段 含义 id 道路编号 road_n 车流量数据信息 -
dwd_traffic_event
为交通事故数据表字段 含义 id 道路编号 traf_n 交通事故发生的数量 -
dws_stream_trafficinfo_total
为双流连接后的交通数据表。字段 含义 id 道路编号 road_n 车流量数据信息 traf_n 交通事故发生的数量
操作
-
第一步,让我们创建表
dwd_traffic_flow
和表dwd_traffic_event
分别存储道路车流量的数据流与交通事故的数据流。CREATE TABLE dwd_traffic_flow ( id int, road_n int ) DISTRIBUTED BY (id); CREATE INDEX ON dwd_traffic_flow (id); CREATE TABLE dwd_traffic_event ( id int, traf_n int ) DISTRIBUTED BY (id); CREATE INDEX ON dwd_traffic_event (id);
-
第二步,创建流
dws_stream_trafficinfo_total
,其被定义为对表dwd_traffic_flow
和表 `dwd_traffic_event 进行流式 JOIN 计算,即当道路车流量的数据流或交通事故的数据流发生变化时都会实时更新至流表的结果中。CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n) AS ( SELECT dwd_traffic_flow.id, dwd_traffic_flow.road_n, dwd_traffic_event.traf_n FROM STREAMING ALL dwd_traffic_flow INNER JOIN STREAMING ALL dwd_traffic_event ON dwd_traffic_flow.id = dwd_traffic_event.id ) PRIMARY KEY (id);
-
第三步,分析并查询数据结果,通过分析表
dws_stream_trafficinfo_total
的数据来判断交通流量模式和预测交通拥堵情况,当道路中的车流量达到某个阀值或发生交通事故时,及时预警。
-
我们首先在表
dwd_traffic_flow
和表dwd_traffic_event
中各插入一条交通记录。INSERT INTO dwd_traffic_flow VALUES (1, 80); INSERT INTO dwd_traffic_event VALUES (2, 3);
-
然后查询流表
dws_stream_trafficinfo_total
,由于已有数据不适用双流 JOIN 操作,则表dws_stream_trafficinfo_total
中暂时未查询到结果数据。SELECT * FROM dws_stream_trafficinfo_total; id | road_n | traf_n ----+--------+-------- (0 rows)
-
我们再次向表
dwd_traffic_flow
和表dwd_traffic_event
中各新增 2 条交通记录。INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100); INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
-
然后查询流表
dws_stream_trafficinfo_total
。流dws_stream_trafficinfo_total
对新增的数据进行增量计算,显示最新的双流 JOIN 结果。SELECT * FROM dws_stream_trafficinfo_total; id | road_n | traf_n ----+--------+-------- 2 | | 3 1 | 80 | 5 3 | 100 | (3 rows)
-
若我们需要修改表
dwd_traffic_flow
或dwd_traffic_event
中的数据,则可使用 UPDATE语句进行操作。UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
-
然后查询流表
dws_stream_trafficinfo_total
。流dws_stream_trafficinfo_total
对修改后的数据进行实时 JOIN 计算,显示最新数据结果。SELECT * FROM dws_stream_trafficinfo_total; id | road_n | traf_n ----+--------+-------- 1 | 80 | 5 3 | 100 | 2 | 101 | 3 (3 rows)
-
若我们需要删除表
dwd_traffic_flow
或表dwd_traffic_event
中的数据,则可使用 DELETE 语句进行操作。DELETE FROM dwd_traffic_event WHERE id = 2;
-
然后查询流表
dws_stream_trafficinfo_total
。流dws_stream_trafficinfo_total
对删除后的数据进行实时 JOIN 计算,显示最新数据结果。SELECT * FROM dws_stream_trafficinfo_total; id | road_n | traf_n ----+---------+-------- 1 | 80 | 5 3 | 100 | (2 rows)