流计算场景实践
流表只能在超级用户下创建,需授权 superuser 权限。
超级用户的创建方法可参考CREATE_ROLE
本文档旨在通过提供基础用例方便用户快速上手 YMatrix 数据库流计算的部分功能。
流计算是一种对实时产生的数据进行快速分析、连续处理的数据处理技术。在 YMatrix 数据库中,你可以使用 SQL 去快速创建属于你自己的数据流。它除了支持插入、过滤、修正和填充等操作外,还支持扩维、聚集、级联、分叉、合并等实时流计算处理操作,全面提升了数仓系统的实时性,降低了系统的复杂度。运用 YMatrix 库内流计算你可以实现秒级、实时和增量的数据结果刷新。 当前,流计算拥有着非常广泛的应用场景,例如:
- 在金融领域,流计算可以用于多维数据分析、资金流实时监控和投资风险分析等;
- 在制造领域,流计算可以用于实时监控与预警、预测性维护和质量控制等;
- 在交通领域,流计算可以用于智能交通管理、车辆轨迹分析和交通流量预测等。
除此之外,流计算在军事、仿真、电商、供应链和物联网等各个领域也存在着大量的应用。
用例 1 :商品数据快速扩维
“扩维计算”通常指通过关联其他表中的列或属性来扩展现有数据表结构生产大宽表的过程。YMatrix 的流计算功能支持对数据进行实时的扩维计算。我们将通过商场业务规划的例子来进一步使用该功能。通过每个顾客的商品购买信息去扩充业务种类的详细信息以得到每条业务线中各类商品的总销售情况,以进一步分析和优化商场各业务流程。
扩维前后的表结构
bill_type
为商场业务种类表;
字段 | 含义 |
---|---|
id | 数据编号 |
order_id | 商品编号 |
type | 商品种类 |
ts | 账单时间 |
trade_info
为顾客交易信息表;
字段 | 含义 |
---|---|
id | 交易商品编号 |
data | 商品详细信息 |
stream_bill_type_detail
为扩维后的商场业务种类表。
字段 | 含义 |
---|---|
id | 数据编号 |
type | 商品种类 |
ts | 账单时间 |
order_id | 商品编号 |
order_data | 商品详细信息 |
- 第一步,让我们创建表
bill_type
用于存储商场业务种类的信息。
CREATE TABLE bill_type (
order_id int,
type text,
ts timestamp
);
- 第二步,让我们创建表
trade_info
,存储不同顾客购买商品的详细信息。
CREATE TABLE trade_info (
id int,
data text
);
- 第三步,让我们创建流
stream_bill_type_detail
,通过顾客交易信息对业务种类信息进行数据扩维,用来展示业务种类交易信息的详细情况。当接收到新数据后,流 `stream_bill_type_detail
CREATE STREAM stream_bill_type_detail(id, type, ts, order_id, order_data)
AS (
SELECT
bill_type.order_id,
bill_type.type,
ts,
bill_type.order_id,
trade_info.data AS order_data
FROM STREAMING ALL bill_type
INNER JOIN trade_info
ON bill_type.order_id = trade_info.id
) PRIMARY KEY (id);
- 第四步,分析和查询数据。
在这个例子中,我们首先查询表
stream_bill_type_detail
中的数据,然后我们在bill_type
表中插入一条新的数据,再次去查询表stream_bill_type_detail
中的数据。它将为我们提供最新的数据结果。
- 插入数据
INSERT INTO bill_type
VALUES (
1,
'fruit',
current_timestamp
);
INSERT INTO trade_info
VALUES (
1,
'apple'
);
INSERT INTO trade_info
VALUES (
2,
'cola'
);
- 查询当前结果
SELECT * FROM stream_bill_type_detail;
id | type | ts | order_id | order_data
----+-------+----------------------------+----------+------------
1 | fruit | 2024-08-01 15:30:55.552471 | 1 | apple
(1 row)
- 插入新数据
INSERT INTO bill_type
VALUES (
2,
'drink',
current_timestamp
);
- 查询最新结果
SELECT * FROM stream_bill_type_detail;
id | type | ts | order_id | order_data
----+-------+----------------------------+----------+------------
1 | fruit | 2024-08-01 15:30:55.552471 | 1 | apple
2 | drink | 2024-08-01 15:31:21.673873 | 2 | cola
(2 rows)
- 若我们需要更新表
bill_type
中的数据,可使用UPDATE
语句进行操作。
UPDATE bill_type SET type = 'drinks' WHERE order_id = 2;
- 查询最新结果
SELECT * FROM stream_bill_type_detail;
id | type | ts | order_id | order_data
----+--------+----------------------------+----------+------------
1 | fruit | 2024-08-01 15:50:23.117737 | 1 | apple
2 | drinks | 2024-08-01 15:50:35.115252 | 2 | cola
(2 rows)
- 若我们需要删除表
bill_type
中的数据,可使用DELETE
语句进行操作。
DELETE FROM bill_type WHERE order_id = 2;
- 查询最新结果
SELECT * FROM stream_bill_type_detail;
id | type | ts | order_id | order_data
----+-------+----------------------------+----------+------------
1 | fruit | 2024-08-01 15:50:23.117737 | 1 | apple
(1 row)
用例 2 :制造数据多级聚集
“聚集计算”通常指的是对一组数据进行汇总和计算以生成统计信息,如总和、平均值、最大值、最小值等。YMatrix 数据库流计算支持对新增、更新和删除的数据的实时聚合计算。我们将通过智能制造实时数据监控的例子来进一步使用该功能。通过对制造数据流实时的聚合及时掌握月度和年度的产品生产量数据信息。
聚集计算过程的表结构
product_detail
为产品生产量信息表;
字段 | 含义 |
---|---|
id | 数据编号 |
category | 产品种类 |
value | 产品生产的数量 |
ts | 时间 |
stream_agg_month
为产量的月度汇总信息;
字段 | 含义 |
---|---|
category | 产品种类 |
y | 年 |
m | 月 |
ym | 年月 |
month_sum | 月产品生产总数 |
month_cut | 月产品数据条数 |
stream_agg_year
为产量的年度汇总信息。
字段 | 含义 |
---|---|
category | 产品种类 |
y | 年 |
year_sum | 年度累计产品生产数量 |
year_cut | 年度累计产品数据条数 |
- 第一步,让我们创建一个用于存储产品生产量信息的表
product_detail
并插入数据。
-- 创建产品生产信息表
CREATE TABLE product_detail (
id bigserial,
category int,
value bigint,
ts timestamp
) DISTRIBUTED BY (id);
-- 插入数据
INSERT INTO product_detail(category, value, ts) VALUES
(1002, 59, '2023-12-12 03:44:05'),
(1001, 15, '2024-01-02 11:22:33'),
(1001, 20, '2024-01-03 22:33:44'),
(1002, 34, '2024-01-04 01:02:03'),
(1001, 27, '2024-02-11 02:03:04'),
(1002, 57, '2024-02-12 03:04:05');
- 第二步,创建流
stream_agg_month
,用于产品月度生产量的聚集操作。创建流stream_agg_year
,用于产品年度生产量的聚集操作。当接收到新数据后,流stream_agg_month
和stream_agg_year
自动执行聚集操作,更新流表中结果为最新。
--创建流 stream_agg_month,月度产品生产量
CREATE STREAM stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
SELECT
category,
extract(year FROM date_trunc('year', ts)::date),
extract(month FROM date_trunc('month', ts)::date),
date_trunc('month', ts)::date,
sum(value),
count(value)
FROM STREAMING ALL product_detail
GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组
)
DISTRIBUTED BY (category, y, m);
--创建流 stream_agg_year,年度产品生产量
CREATE STREAM stream_agg_year (category, year, year_sum, year_cnt) AS (
SELECT
stream_agg_month.category,
stream_agg_month.y,
sum(stream_agg_month.month_sum),
sum(stream_agg_month.month_cnt)
FROM STREAMING ALL stream_agg_month
GROUP BY 1, 2 --按照产品种类,年进行分组
)
DISTRIBUTED BY (category, year);
- 第三步,分析和查询数据结果
- 我们可以先查询当前表
product_detail
中的产品生产量信息。
-- 按照产品种类,时间进行排序
SELECT * FROM product_detail ORDER BY 2,4;
- 然后查询表
stream_agg_month
和表stream_agg_year
,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。
--查询产品月度生产量信息,按种类,年,月排序
SELECT * FROM stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt | partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2 | \x000000000000000200010000000000000023
1001 | 2024 | 2 | 2024-02-01 | 27 | 1 | \x00000000000000010001000000000000001b
1002 | 2023 | 12 | 2023-12-01 | 59 | 1 | \x00000000000000010001000000000000003b
1002 | 2024 | 1 | 2024-01-01 | 34 | 1 | \x000000000000000100010000000000000022
1002 | 2024 | 2 | 2024-02-01 | 57 | 1 | \x000000000000000100010000000000000039
(5 rows)
--查询产品年度生产量信息,按种类,年排序
SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
- 让我们再向表
product_detail
中新增一条产品生产量数据,流stream_agg_month
和 `stream_agg_year 会对新增的数据进行实时连续聚集。
INSERT INTO product_detail VALUES(7,1001,30,'2024-04-04 01:23:44');
- 再次查询表
stream_agg_month
和表 `stream_agg_year,表中显示最新的聚集数据结果。
SELECT * FROM stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt | partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2 | \x000000000000000200010000000000000023
1001 | 2024 | 2 | 2024-02-01 | 27 | 1 | \x00000000000000010001000000000000001b
1001 | 2024 | 4 | 2024-04-01 | 30 | 1 | \x00000000000000010001000000000000001e
1002 | 2023 | 12 | 2023-12-01 | 59 | 1 | \x00000000000000010001000000000000003b
1002 | 2024 | 1 | 2024-01-01 | 34 | 1 | \x000000000000000100010000000000000022
1002 | 2024 | 2 | 2024-02-01 | 57 | 1 | \x000000000000000100010000000000000039
(6 rows)
SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 92 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
- 若我们需要更新产品生产量数据表
product_detail
时,则可使用UPDATE
语句进行操作。
UPDATE product_detail SET value = 100 WHERE id = 7;
- 然后查询流表
stream_agg_month
和表stream_agg_year
,聚集结果已更新。
SELECT * FROM stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt | partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2 | \x000000000000000200010000000000000023
1001 | 2024 | 2 | 2024-02-01 | 27 | 1 | \x00000000000000010001000000000000001b
1001 | 2024 | 4 | 2024-04-01 | 100 | 1 | \x000000000000000100010000000000000064
1002 | 2023 | 12 | 2023-12-01 | 59 | 1 | \x00000000000000010001000000000000003b
1002 | 2024 | 1 | 2024-01-01 | 34 | 1 | \x000000000000000100010000000000000022
1002 | 2024 | 2 | 2024-02-01 | 57 | 1 | \x000000000000000100010000000000000039
(6 rows)
SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 162 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
- 若我们需要删除产品生产量数据表
product_detail
的数据时,则可使用DELETE
语句进行操作。
DELETE FROM product_detail WHERE id = 7;
- 然后查询流表
stream_agg_month
和表 `stream_agg_year,聚集结果已更新。
-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。
DELECT * FROM stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt | partial_month_sum
----------+------+----+------------+-----------+-----------+----------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2 | \x000000000000000200010000000000000023
1001 | 2024 | 2 | 2024-02-01 | 27 | 1 | \x00000000000000010001000000000000001b
1001 | 2024 | 4 | 2024-04-01 | 0 | 0 | \x00000000000000000000000000000000
1002 | 2023 | 12 | 2023-12-01 | 59 | 1 | \x00000000000000010001000000000000003b
1002 | 2024 | 1 | 2024-01-01 | 34 | 1 | \x000000000000000100010000000000000022
1002 | 2024 | 2 | 2024-02-01 | 57 | 1 | \x000000000000000100010000000000000039
(6 rows)
SELECT category, year, year_sum, year_cnt FROM stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
用例 3 :交通数据双流关联
“双流 JOIN 计算”通常指的是在流处理中,对两个实时数据流进行连接(JOIN)操作,以便能够根据某些共同的键将来自两个不同流的数据项组合在一起进行分析或进一步的处理。YMatrix 数据库流计算支持对新增数据的增量 JOIN 计算。我们将通过路况实时监测的例子来进一步使用该功能,体验双数据流实时增量 JOIN 的效果。通过对车流量和交通事故数量的关联可以通过预先设定的预警阀值快速判断当前道路情况,实现及时的路况预警和事故处理。
双流 JOIN 前后的表结构
traffic_flow
为车流量数据表;
字段 | 含义 |
---|---|
id | 道路编号 |
road_n | 车流量数据信息 |
traffic_event
为交通事故数据表;
字段 | 含义 |
---|---|
id | 道路编号 |
traf_n | 交通事故发生的数量 |
stream_trafficinfo_total
为双流连接后的交通数据表。
字段 | 含义 |
---|---|
id | 道路编号 |
road_n | 车流量数据信息 |
traf_n | 交通事故发生的数量 |
- 第一步,让我们创建表
traffic_flow
和表traffic_event
分别存储道路车流量的数据流与交通事故的数据流。
CREATE TABLE traffic_flow (
id int,
road_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON traffic_flow (id);
CREATE TABLE traffic_event (
id int,
traf_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON traffic_event (id);
- 第二步,创建流
stream_trafficinfo_total
,其被定义为对表traffic_flow
和表traffic_event
进行流式 JOIN 计算,即当道路车流量的数据流或交通事故的数据流发生变化时都会实时更新至流表的结果中。
CREATE STREAM stream_trafficinfo_total(id, road_n, traf_n)
AS (
SELECT
traffic_flow.id,
traffic_flow.road_n,
traffic_event.traf_n
FROM STREAMING ALL traffic_flow
INNER JOIN STREAMING ALL traffic_event
ON traffic_flow.id = traffic_event.id
) PRIMARY KEY (id);
- 第三步,分析并查询数据结果,通过分析表
stream_trafficinfo_total
的数据来判断交通流量模式和预测交通拥堵情况,当道路中的车流量达到某个阀值或发生交通事故时,及时预警。
- 我们首先在表
traffic_flow
和表traffic_event
中各插入一条交通记录。
INSERT INTO traffic_flow VALUES (1, 80);
INSERT INTO traffic_event VALUES (2, 3);
- 然后查询流表
stream_trafficinfo_total
,由于已有数据不适用双流 JOIN 操作,则表stream_trafficinfo_total
中暂时未查询到结果数据。
SELECT * FROM stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
(0 rows)
- 我们再次向表
traffic_flow
和表traffic_event
中各新增 2 条交通记录。
INSERT INTO traffic_flow VALUES (2, NULL), (3, 100);
INSERT INTO traffic_event VALUES (1, 5), (3, NULL);
- 然后查询流表
stream_trafficinfo_total
。流stream_trafficinfo_total
对新增的数据进行增量计算,显示最新的双流 JOIN 结果。
SELECT * FROM stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
2 | | 3
1 | 80 | 5
3 | 100 |
(3 rows)
- 若我们需要修改表
traffic_flow
或traffic_event
中的数据,则可使用 UPDATE语句进行操作。
UPDATE traffic_flow SET road_n = 101 WHERE id=2;
- 然后查询流表
stream_trafficinfo_total
。流stream_trafficinfo_total
对修改后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
1 | 80 | 5
3 | 100 |
2 | 101 | 3
(3 rows)
- 若我们需要删除表
traffic_flow
或表traffic_event
中的数据,则可使用 DELETE 语句进行操作。
DELETE FROM traffic_event WHERE id = 2;
- 然后查询流表
stream_trafficinfo_total
。流stream_trafficinfo_total
对删除后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM stream_trafficinfo_total;
id | road_n | traf_n
----+---------+--------
1 | 80 | 5
3 | 100 |
(2 rows)