在传统的数据仓库中,要实现数据流的双流 JOIN 操作往往需要设置特定的时间窗口,然后再该时间窗口内对时段内的数据进行双流关联操作。这种操作流程无法实现真正的实时数据关联以快速响应企业管理和企业决策。
然而在 Domino 流计算中,通过之前第一篇扩维、第二篇聚集的案例实践操作,我们已经了解到 Domino 可以实现 ODS -> DWD -> DWS 的全流程实时链路数据处理,用户可以使用简单的 SQL 直接定义流。
这次我们将从双流关联出发,找寻解决从最新业务数据的实时双流关联,到支持业务决策的最佳解决方案。
下面,我们将通过路况实时监测的例子来进一步使用该功能,体验双数据流实时增量 JOIN 的效果。通过对车流量和交通事故的关联可以通过预先设定的预警阀值快速判断当前道路情况,实现及时的路况预警和事故处理。
表结构
dwd_traffic_flow
为车流量数据表;
dwd_traffic_event
为交通事故数据表;
dws_stream_trafficinfo_total
为双流连接后的交通数据表。
![](https://img.ymatrix.cn/ymatrix_home/截屏2024-12-19 上午11.21.55_1734684559.png)
操作
- 第一步,让我们创建表
dwd_traffic_flow
和表dwd_traffic_event
分别存储道路车流量的数据流与交通事故的数据流。
CREATE TABLE dwd_traffic_flow (
id int,
road_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_flow (id);
CREATE TABLE dwd_traffic_event (
id int,
traf_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_event (id);
- 第二步,创建流
dws_stream_trafficinfo_total
,其被定义为对表dwd_traffic_flow
和表dwd_traffic_event
进行流式 JOIN 计算,即当道路车流量的数据流或交通事故的数据流发生变化时都会实时更新至流表的结果中。
CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
AS (
SELECT
dwd_traffic_flow.id,
dwd_traffic_flow.road_n,
dwd_traffic_event.traf_n
FROM STREAMING ALL dwd_traffic_flow
INNER JOIN STREAMING ALL dwd_traffic_event
ON dwd_traffic_flow.id = dwd_traffic_event.id
) PRIMARY KEY (id);
- 第三步,分析并查询数据结果,通过表
dws_stream_trafficinfo_total
的数据来判断交通流量模式和预测交通拥堵情况,当道路中的车流量达到某个阀值或发生交通事故时,及时预警。
我们首先在表 dwd_traffic_flow
和表 dwd_traffic_event
中各插入一条交通记录。
INSERT INTO dwd_traffic_flow VALUES (1, 80);
INSERT INTO dwd_traffic_event VALUES (2, 3);
然后查询流表 dws_stream_trafficinfo_total
,由于已有数据不适用双流 JOIN 操作,则表 dws_stream_trafficinfo_total
中暂时未查询到结果数据。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
(0 rows)
我们再次向表 dwd_traffic_flow
和表 dwd_traffic_event
中各新增 2 条交通记录。
INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL)
然后查询流表 dws_stream_trafficinfo_total
。流 dws_stream_trafficinfo_total
对新增的数据进行增量计算,显示最新的双流 JOIN 结果。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
2 | | 3
1 | 80 | 5
3 | 100 |
(3 rows)
- 若我们需要修改表
dwd_traffic_flow
或dwd_traffic_event
中的数据,则可使用UPDATE
语句进行操作。
UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2
然后查询流表 dws_stream_trafficinfo_total
。流 dws_stream_trafficinfo_total
对修改后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
1 | 80 | 5
3 | 100 |
2 | 101 | 3
(3 rows)
- 若我们需要删除表
dwd_traffic_flow
或表dwd_traffic_event
中的数据,则可使用DELETE
语句进行操作。
DELETE FROM dwd_traffic_event WHERE id = 2;
然后查询流表 dws_stream_trafficinfo_total
。流 dws_stream_trafficinfo_total
对删除后的数据进行实时 JOIN 计算,显示最新数据结果。
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+---------+--------
1 | 80 | 5
3 | 100 |
(2 rows)