Domino 流计算快速上手 - 2. 聚集

2024-11-20 · YMatrix Team
#产品动态

根据经典的数据仓库建模理论,原始交易数据经过扩维操作存储至 DWD (明细层)后,下一步就是根据分析需求,针对明细数据进行各类聚集计算。

像"扩维"操作一样,我们通常采用“跑批”的方式进行处理。具体而言,在ETL和扩维操作后,我们会使用工具调度执行 SQL, 针对 DWD 层"大宽表"存储的数据进行聚集计算。

一方面前置 ETL和扩维操作均为批处理形式, 导致聚集计算也不得不采取批处理方式,因而无法达到“实时”的效果;另一方面,由于 ODS 层通常存储的是不同分类方式的汇总数据,计算时会涉及大量的数据扫描,占用较多系统资源,不得不选择在夜间低负载时执行,进一步限制了分析业务的灵活性。

通过使用 Domino 流计算技术,用户可以在 YMatrix 数据库中,用简单的 SQL 描述聚集操作,结合第一篇文章 Domino 流计算快速上手 - 1.扩维介绍的流式的扩维操作一起,就能实现交易数据到聚集结果实时流式更新,即 ODS -> DWD-> DWS 的实时链路。

下面,我们就通过实时计算月度和年度产量的例子来演示如何使用 Domino 流计算技术进行实时的聚集计算。

这个例子的特别之处是我们使用了流式级联架构,也就是在产量明细表之后将月度、年度汇总依次串联聚集。

表结构

dwd_production为产量信息表;

dws_stream_agg_month为产量的月度汇总信息;

dws_stream_agg_year为产量的年度汇总信息。

操作

  1. 第一步,让我们创建一个用于存储产品产量信息的表dwd_production 并插入数据。
-- 创建产品生产信息表
CREATE TABLE dwd_production (
    id        bigserial,
    category int,
    value     bigint,
    ts        timestamp
) DISTRIBUTED BY (id);

-- 插入数据
INSERT INTO dwd_production(category, value, ts) VALUES
   (1002, 59, '2023-12-12 03:44:05'),
   (1001, 15, '2024-01-02 11:22:33'),
   (1001, 20, '2024-01-03 22:33:44'),
   (1002, 34, '2024-01-04 01:02:03'),
   (1001, 27, '2024-02-11 02:03:04'),
   (1002, 57, '2024-02-12 03:04:05');
  1. 第二步,创建流 dws_stream_agg_month,用于产品月度生产量的聚集操作。创建流 dws_stream_agg_year,用于产品年度生产量的聚集操作。当接收到新数据后,流 dws_stream_agg_monthdws_stream_agg_year 自动执行聚集操作,更新流表中结果为最新。
--创建流 stream_agg_month,月度产品生产量
CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
   SELECT
     category,
     extract(year FROM date_trunc('year', ts)::date),
     extract(month FROM date_trunc('month', ts)::date),
     date_trunc('month', ts)::date,
     sum(value),
     count(value)
   FROM STREAMING ALL dwd_production
   GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组
 )
 DISTRIBUTED BY (category, y, m);

--创建流 stream_agg_year,年度产品生产量
CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
   SELECT
     dwd_stream_agg_month.category,
     dwd_stream_agg_month.y,
     sum(dwd_stream_agg_month.month_sum),
     sum(dwd_stream_agg_month.month_cnt)
   FROM STREAMING ALL stream_agg_month
   GROUP BY 1, 2 --按照产品种类,年进行分组
 )
 DISTRIBUTED BY (category, year);
  1. 第三步,分析和查询数据结果

我们可以先查询当前表 dwd_production 中的产品生产数量信息。

-- 按照产品种类,时间进行排序
SELECT * FROM dwd_production ORDER BY 2,4;

然后查询表 dws_stream_agg_month 和表 dws_stream_agg_year,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。

--查询产品月度生产量信息,按种类,年,月排序
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt 
----------+------+----+------------+-----------+-----------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 
(5 rows)

--查询产品年度生产量信息,按种类,年排序
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
  category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |       62 |        3
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)

让我们再向表 dwd_production 中新增一条产品生产量数据,流 dws_stream_agg_monthdws_stream_agg_year 会对新增的数据进行实时连续聚集。

INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
再次查询表 dws_stream_agg_month 和表 dws_stream_agg_year,表中显示最新的聚集数据结果。
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt 
----------+------+----+------------+-----------+-----------
     1001 | 2024 |  1 | 2024-01-01 |        35 |        2 
     1001 | 2024 |  2 | 2024-02-01 |        27 |        1 
     1001 | 2024 |  4 | 2024-04-01 |        30 |        1 
     1002 | 2023 | 12 | 2023-12-01 |        59 |        1 
     1002 | 2024 |  1 | 2024-01-01 |        34 |        1 
     1002 | 2024 |  2 | 2024-02-01 |        57 |        1 
(6 rows)

SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
  category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |       92 |        4
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)
  1. 若我们需要更新产品生产量信息表 dwd_production 中的数据时,则可使用UPDATE 语句进行操作。
    UPDATE dwd_production SET value = 100 WHERE id = 7;

然后查询流表 dws_stream_agg_month 和表 dws_stream_agg_year,聚集结果已更新。

SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt 
----------+------+----+------------+-----------+-----------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 
     1001 | 2024 |  4 | 2024-04-01 |       100 |         1 
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 

SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;

 category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |      162 |        4
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)
  1. 若我们需要删除产品生产量数据表 dwd_production 的数据时,则可使用 DELETE 语句进行操作。
DELETE FROM dwd_production WHERE id = 7;

然后查询流表 dws_stream_agg_month 和表 dws_stream_agg_year,聚集结果已更新。

-- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。
DELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
 category |  y   | m  |     ym     | month_sum | month_cnt      
----------+------+----+------------+-----------+-----------
     1001 | 2024 |  1 | 2024-01-01 |        35 |         2 
     1001 | 2024 |  2 | 2024-02-01 |        27 |         1 
     1001 | 2024 |  4 | 2024-04-01 |         0 |         0 
     1002 | 2023 | 12 | 2023-12-01 |        59 |         1 
     1002 | 2024 |  1 | 2024-01-01 |        34 |         1 
     1002 | 2024 |  2 | 2024-02-01 |        57 |         1 

SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
 category | year | year_sum | year_cnt
----------+------+----------+----------
     1001 | 2024 |       62 |        3
     1002 | 2023 |       59 |        1
     1002 | 2024 |       91 |        2
(3 rows)