流表只能在超级用户下创建,需授权 superuser 权限。
超级用户的创建方法可参考 CREATE_ROLE
本文档旨在通过提供基础用例方便用户快速上手 YMatrix 数据库流计算的部分功能。
流计算是一种对实时产生的数据进行快速分析、连续处理的数据处理技术。在 YMatrix 数据库中,你可以使用 SQL 去快速创建属于你自己的数据流。它除了支持插入、过滤、修正和填充等操作外,还支持扩维、聚集、级联、分叉、合并等实时流计算处理操作,全面提升了数仓系统的实时性,降低了系统的复杂度。运用 YMatrix 库内流计算你可以实现秒级、实时和增量的数据结果刷新。 当前,流计算拥有着非常广泛的应用场景,例如:
除此之外,流计算在军事、仿真、电商、供应链和物联网等各个领域也存在着大量的应用。
“扩维计算”通常指通过关联其他表中的列或属性来扩展现有数据表结构生产大宽表的过程。YMatrix 的流计算功能支持对数据进行实时的扩维计算。
我们将通过一个简单的对订单+商品信息执行扩维操作的例子。通过每个顾客的商品购买信息去扩充业务订单以得到每条业务线中各类商品的总销售情况。
ods_order
为订单信息:存储订单信息的事实表
字段 | 含义 |
---|---|
id | 订单 id |
prod_id | 商品 id |
ts | 订单时间 |
dim_prod
为商品信息:存储商品详细信息的维度表
字段 | 含义 |
---|---|
id | 交易商品编号 |
pord_name | 商品名称 |
pord_detail | 商品详情 |
dwd_order_detail
为订单详情:扩维后包含所有商品信息的订单详情表
字段 | 含义 |
---|---|
id | 订单 id |
ts | 账单时间 |
prod_id | 商品 id |
pord_name | 商品名称 |
pord_detail | 商品详情 |
第一步,让我们创建表 ods_order
和 dim_prod
。
CREATE TABLE ods_order (
id int,
prod_id int,
ts timestamp
)
DISTRIBUTED BY (id);
CREATE TABLE dim_prod (
id int,
prod_name text,
prod_detail text
)
DISTRIBUTED BY (id);
第二步,让我们创建流 dwd_order_detail
,通过商品信息对原始交易数据进行数据扩维。当 dim_prod
有新数据更新时,dwd_order_detail
会即时增量刷新,最新的、经过扩维的交易信息会自动写入。
CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
AS (
SELECT
ods_order.id,
ods_order.ts,
ods_order.prod_id,
dim_prod.prod_name,
dim_prod.prod_detail
FROM STREAMING ALL dim_prod
INNER JOIN ods_order
ON dim_prod.id = ods_order.prod_id
) PRIMARY KEY (id);
第三步,准备数据。
插入订单信息
-- 订单 1
INSERT INTO ods_order
VALUES (
1,
1,
current_timestamp
);
-- 订单 2
INSERT INTO ods_order
VALUES (
2,
2,
current_timestamp
);
插入商品信息
INSERT INTO dim_prod
VALUES (
1,
'apple',
'fruit_001'
);
INSERT INTO dim_prod
VALUES (
2,
'cola',
'drink_001'
);
我们通过流 dwd_order_detail
关联表 ods_order
和表 dim_prod
,一旦我们向 dim_prod
中插入新数据,那么流表 dwd_order_detail
也会立即更新,展示出扩维后的新订单数据。
查询当前结果
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | cola | drink_001
(2 rows)
若我们需要更新表 dim_prod
中的数据,可使用 UPDATE
语句进行操作。
-- 更新 cola -> pepsi
UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
查询最新结果
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | pepsi | drink_001
(2 rows)
若我们需要删除表dim_prod
中的数据,可使用 DELETE
语句进行操作。
-- 删除订单2
DELETE FROM dim_prod WHERE id = 2;
查询最新结果
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
(1 row)
“聚集计算”通常指的是对一组数据进行汇总和计算以生成统计信息,如总和、平均值、最大值、最小值等。YMatrix 数据库流计算支持对新增、更新和删除的数据的实时聚合计算。
我们将通过智能制造实时数据监控的例子来进一步使用该功能。通过对制造数据流实时的聚合及时掌握月度和年度的产品生产量数据信息。
dwd_production
为产品生产量信息表
字段 | 含义 |
---|---|
id | 数据编号 |
category | 产品种类 |
value | 产品生产的数量 |
ts | 时间 |
dws_stream_agg_month
为产量的月度汇总信息
字段 | 含义 |
---|---|
category | 产品种类 |
y | 年 |
m | 月 |
ym | 年月 |
month_sum | 月产品生产总数 |
month_cut | 月产品数据条数 |
dws_stream_agg_year
为产量的年度汇总信息
字段 | 含义 |
---|---|
category | 产品种类 |
y | 年 |
year_sum | 年度累计产品生产数量 |
year_cut | 年度累计产品数据条数 |
第一步,让我们创建一个用于存储产品生产量信息的表 dwd_production
并插入数据。
-- 创建产品生产信息表
CREATE TABLE dwd_production (
id bigserial,
category int,
value bigint,
ts timestamp
) DISTRIBUTED BY (id);
-- 插入数据
INSERT INTO dwd_production(category, value, ts) VALUES
(1002, 59, '2023-12-12 03:44:05'),
(1001, 15, '2024-01-02 11:22:33'),
(1001, 20, '2024-01-03 22:33:44'),
(1002, 34, '2024-01-04 01:02:03'),
(1001, 27, '2024-02-11 02:03:04'),
(1002, 57, '2024-02-12 03:04:05');
第二步,创建流 dws_stream_agg_month
,用于产品月度生产量的聚集操作。创建流 dws_stream_agg_year
,用于产品年度生产量的聚集操作。当接收到新数据后,流 dws_stream_agg_month
和 dws_stream_agg_year
自动执行聚集操作,更新流表中结果为最新。
--创建流 dws_stream_agg_month,月度产品生产量
CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
SELECT
category,
extract(year FROM date_trunc('year', ts)::date),
extract(month FROM date_trunc('month', ts)::date),
date_trunc('month', ts)::date,
sum(value),
count(value)
FROM STREAMING ALL dwd_production
GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组
)
DISTRIBUTED BY (category, y, m);
--创建流 dws_stream_agg_year,年度产品生产量
CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
SELECT
dws_stream_agg_month.category,
dws_stream_agg_month.y,
sum(dws_stream_agg_month.month_sum),
sum(dws_stream_agg_month.month_cnt)
FROM STREAMING ALL dws_stream_agg_month
GROUP BY 1, 2 --按照产品种类,年进行分组
)
DISTRIBUTED BY (category, year);
第三步,分析和查询数据结果
我们可以先查询当前表 dwd_production
中的产品生产量信息。
-- 按照产品种类,时间进行排序
SELECT * FROM dwd_production ORDER BY 2,4;
然后查询表 dws_stream_agg_month
和表 dws_stream_agg_year
,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。
--查询产品月度生产量信息,按种类,年,月排序
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(5 rows)
--查询产品年度生产量信息,按种类,年排序
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
让我们再向表 dwd_production
中新增一条产品生产量数据,流 dws_stream_agg_month
和 dws_stream_agg_year
会对新增的数据进行实时连续聚集。
INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
再次查询表 dws_stream_agg_month
和 dws_stream_agg_year
,表中显示最新的聚集数据结果。
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 30 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(6 rows)
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 92 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
若我们需要更新产品生产量数据表 dwd_production
时,则可使用 UPDATE
语句进行操作。
UPDATE dwd_production SET value = 100 WHERE id = 7;
然后查询流表 dws_stream_agg_month
和 dws_stream_agg_year
,聚集结果已更新。
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 100 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 162 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
若我们需要删除产品生产量数据表 dwd_production
的数据时,则可使用 DELETE
语句进行操作。
DELETE FROM dwd_production WHERE id = 7;
然后查询流表 dws_stream_agg_month
和 dws_stream_agg_year
,聚集结果已更新。
-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 0 | 0
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
“双流 JOIN 计算”通常指的是在流处理中,对两个实时数据流进行连接(JOIN)操作,以便能够根据某些共同的键将来自两个不同流的数据项组合在一起进行分析或进一步的处理。YMatrix 数据库流计算支持对新增数据的增量 JOIN 计算。我们将通过路况实时监测的例子来进一步使用该功能,体验双数据流实时增量 JOIN 的效果。
通过对车流量和交通事故数量的关联可以通过预先设定的预警阀值快速判断当前道路情况,实现及时的路况预警和事故处理。
dwd_traffic_flow
为车流量数据表
字段 | 含义 |
---|---|
id | 道路编号 |
road_n | 车流量数据信息 |
dwd_traffic_event
为交通事故数据表
字段 | 含义 |
---|---|
id | 道路编号 |
traf_n | 交通事故发生的数量 |
dws_stream_trafficinfo_total
为双流连接后的交通数据表。
字段 | 含义 |
---|---|
id | 道路编号 |
road_n | 车流量数据信息 |
traf_n | 交通事故发生的数量 |
第一步,让我们创建表 dwd_traffic_flow
和表 dwd_traffic_event
分别存储道路车流量的数据流与交通事故的数据流。
CREATE TABLE dwd_traffic_flow (
id int,
road_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_flow (id);
CREATE TABLE dwd_traffic_event (
id int,
traf_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_event (id);
第二步,创建流 dws_stream_trafficinfo_total
,其被定义为对表 dwd_traffic_flow
和表 `dwd_traffic_event 进行流式 JOIN 计算,即当道路车流量的数据流或交通事故的数据流发生变化时都会实时更新至流表的结果中。
CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
AS (
SELECT
dwd_traffic_flow.id,
dwd_traffic_flow.road_n,
dwd_traffic_event.traf_n
FROM STREAMING ALL dwd_traffic_flow
INNER JOIN STREAMING ALL dwd_traffic_event
ON dwd_traffic_flow.id = dwd_traffic_event.id
) PRIMARY KEY (id);
第三步,分析并查询数据结果,通过分析表 dws_stream_trafficinfo_total
的数据来判断交通流量模式和预测交通拥堵情况,当道路中的车流量达到某个阀值或发生交通事故时,及时预警。
我们首先在表 dwd_traffic_flow
和表 dwd_traffic_event
中各插入一条交通记录。
INSERT INTO dwd_traffic_flow VALUES (1, 80);
INSERT INTO dwd_traffic_event VALUES (2, 3);
然后查询流表 dws_stream_trafficinfo_total
,由于已有数据不适用双流 JOIN 操作,则表 dws_stream_trafficinfo_total
中暂时未查询到结果数据。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
(0 rows)
我们再次向表 dwd_traffic_flow
和表 dwd_traffic_event
中各新增 2 条交通记录。
INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
然后查询流表 dws_stream_trafficinfo_total
。流 dws_stream_trafficinfo_total
对新增的数据进行增量计算,显示最新的双流 JOIN 结果。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
2 | | 3
1 | 80 | 5
3 | 100 |
(3 rows)
若我们需要修改表 dwd_traffic_flow
或 dwd_traffic_event
中的数据,则可使用 UPDATE语句进行操作。
UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
然后查询流表 dws_stream_trafficinfo_total
。流 dws_stream_trafficinfo_total
对修改后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
1 | 80 | 5
3 | 100 |
2 | 101 | 3
(3 rows)
若我们需要删除表 dwd_traffic_flow
或表 dwd_traffic_event
中的数据,则可使用 DELETE 语句进行操作。
DELETE FROM dwd_traffic_event WHERE id = 2;
然后查询流表 dws_stream_trafficinfo_total
。流 dws_stream_trafficinfo_total
对删除后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+---------+--------
1 | 80 | 5
3 | 100 |
(2 rows)