流计算场景实践

流表只能在超级用户下创建,需授权 superuser 权限。
超级用户的创建方法可参考 CREATE_ROLE

本文档旨在通过提供基础用例方便用户快速上手 YMatrix 数据库流计算的部分功能。


流计算是一种对实时产生的数据进行快速分析、连续处理的数据处理技术。在 YMatrix 数据库中,你可以使用 SQL 去快速创建属于你自己的数据流。它除了支持插入、过滤、修正和填充等操作外,还支持扩维、聚集、级联、分叉、合并等实时流计算处理操作,全面提升了数仓系统的实时性,降低了系统的复杂度。运用 YMatrix 库内流计算你可以实现秒级、实时和增量的数据结果刷新。 当前,流计算拥有着非常广泛的应用场景,例如:

  • 在金融领域,流计算可以用于多维数据分析、资金流实时监控和投资风险分析等;
  • 在制造领域,流计算可以用于实时监控与预警、预测性维护和质量控制等;
  • 在交通领域,流计算可以用于智能交通管理、车辆轨迹分析和交通流量预测等。

除此之外,流计算在军事、仿真、电商、供应链和物联网等各个领域也存在着大量的应用。


用例 1 :商品数据快速扩维

“扩维计算”通常指通过关联其他表中的列或属性来扩展现有数据表结构生产大宽表的过程。YMatrix 的流计算功能支持对数据进行实时的扩维计算。
我们将通过一个简单的对订单+商品信息执行扩维操作的例子。通过每个顾客的商品购买信息去扩充业务订单以得到每条业务线中各类商品的总销售情况。


表结构

  • ods_order 为订单信息:存储订单信息的事实表

    字段 含义
    id 订单 id
    prod_id 商品 id
    ts 订单时间
  • dim_prod 为商品信息:存储商品详细信息的维度表

    字段 含义
    id 交易商品编号
    pord_name 商品名称
    pord_detail 商品详情
  • dwd_order_detail 为订单详情:扩维后包含所有商品信息的订单详情表

    字段 含义
    id 订单 id
    ts 账单时间
    prod_id 商品 id
    pord_name 商品名称
    pord_detail 商品详情

操作

  1. 第一步,让我们创建表 ods_orderdim_prod

     CREATE TABLE ods_order (
        id int,
        prod_id int,
        ts timestamp
        )
     DISTRIBUTED BY (id);
    
     CREATE TABLE dim_prod (
         id int,
         prod_name text,
         prod_detail text,
      )
     DISTRIBUTED BY (id);
  2. 第二步,让我们创建流 dwd_order_detail,通过商品信息对原始交易数据进行数据扩维。当 ods_order 有新数据插入时,dwd_order_detail 会即时增量刷新,最新的、经过扩维的交易信息会自动写入。

     CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
     AS (
        SELECT
           ods_order.id,
           ods_order.ts,
           ods_order.prod_id,
           dim_prod.prod_name,
           dim_prod.prod_detail
        FROM STREAMING ALL ods_order
        INNER JOIN dim_prod
            ON ods_order.prod_id = dim_prod.id
     ) PRIMARY KEY (id);
  3. 第三步,准备数据。

  • 插入商品信息

      INSERT INTO dim_prod
      VALUES (
          1,
          'apple',
          'fruit_001'
      );
    
      INSERT INTO dim_prod
      VALUES (
          2,
          'cola',
          'drink_001'
      );
  • 插入订单信息

      -- 订单 1
      INSERT INTO ods_order
      VALUES (
          1,
          1,
          current_timestamp
      );
      -- 订单 2
      INSERT INTO ods_order
      VALUES (
          2,
          2,
          current_timestamp
      );

我们通过流 dwd_order_detail 关联 表 ods_order 和 表 dim_prod ,一旦我们向 ods_order 中插入新数据,那么流表 dwd_order_detail 也会立即更新,展示出扩维后的新订单数据。

  • 查询当前结果

      SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
        2 | 2024-08-01 15:50:35.115252 |       2 | cola      | drink_001
      (2 rows)
  1. 若我们需要更新表 dim_prod 中的数据,可使用 UPDATE 语句进行操作。

     -- 更新 cola -> pepsi
     UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
  • 查询最新结果

       SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
        2 | 2024-08-01 15:50:35.115252 |       2 | pepsi     | drink_001
      (2 rows)
    
  1. 若我们需要删除表dim_prod 中的数据,可使用 DELETE 语句进行操作。

     -- 删除订单2
     DELETE FROM ods_order WHERE id = 2;
  • 查询最新结果

      SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
      (1 row)

用例 2 :制造数据多级聚集

“聚集计算”通常指的是对一组数据进行汇总和计算以生成统计信息,如总和、平均值、最大值、最小值等。YMatrix 数据库流计算支持对新增、更新和删除的数据的实时聚合计算。
我们将通过智能制造实时数据监控的例子来进一步使用该功能。通过对制造数据流实时的聚合及时掌握月度和年度的产品生产量数据信息。


表结构

  • dwd_production为产品生产量信息表

    字段 含义
    id 数据编号
    category 产品种类
    value 产品生产的数量
    ts 时间
  • dws_stream_agg_month为产量的月度汇总信息

    字段 含义
    category 产品种类
    y
    m
    ym 年月
    month_sum 月产品生产总数
    month_cut 月产品数据条数
  • dws_stream_agg_year为产量的年度汇总信息

    字段 含义
    category 产品种类
    y
    year_sum 年度累计产品生产数量
    year_cut 年度累计产品数据条数

操作

  1. 第一步,让我们创建一个用于存储产品生产量信息的表 dwd_production 并插入数据。

     -- 创建产品生产信息表
     CREATE TABLE dwd_production (
         id        bigserial,
         category int,
         value     bigint,
         ts        timestamp
     ) DISTRIBUTED BY (id);
    
     -- 插入数据
     INSERT INTO dwd_production(category, value, ts) VALUES
        (1002, 59, '2023-12-12 03:44:05'),
        (1001, 15, '2024-01-02 11:22:33'),
        (1001, 20, '2024-01-03 22:33:44'),
        (1002, 34, '2024-01-04 01:02:03'),
        (1001, 27, '2024-02-11 02:03:04'),
        (1002, 57, '2024-02-12 03:04:05');
  2. 第二步,创建流 dws_stream_agg_month,用于产品月度生产量的聚集操作。创建流 dws_stream_agg_year ,用于产品年度生产量的聚集操作。当接收到新数据后,流 dws_stream_agg_monthdws_stream_agg_year 自动执行聚集操作,更新流表中结果为最新。

     --创建流 dwd_stream_agg_month,月度产品生产量
     CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
        SELECT
          category,
          extract(year FROM date_trunc('year', ts)::date),
          extract(month FROM date_trunc('month', ts)::date),
          date_trunc('month', ts)::date,
          sum(value),
          count(value)
        FROM STREAMING ALL dwd_production
        GROUP BY 1, 2, 3, 4 --按照产品种类,年,月,年月进行分组
      )
      DISTRIBUTED BY (category, y, m);
    
     --创建流 dwd_stream_agg_year,年度产品生产量
     CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
        SELECT
          dwd_stream_agg_month.category,
          dwd_stream_agg_month.y,
          sum(dwd_stream_agg_month.month_sum),
          sum(dwd_stream_agg_month.month_cnt)
        FROM STREAMING ALL dws_stream_agg_month
        GROUP BY 1, 2 --按照产品种类,年进行分组
      )
      DISTRIBUTED BY (category, year);
  3. 第三步,分析和查询数据结果

  • 我们可以先查询当前表 dwd_production 中的产品生产量信息。

      -- 按照产品种类,时间进行排序
      SELECT * FROM dwd_production ORDER BY 2,4;
  • 然后查询表 dws_stream_agg_month 和表 dws_stream_agg_year,结果根据产品种类和时间显示产品生产量的月度数据和年度数据。

      --查询产品月度生产量信息,按种类,年,月排序
      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1
      (5 rows)
    
      --查询产品年度生产量信息,按种类,年排序
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
        category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       62 |        3
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
  • 让我们再向表 dwd_production 中新增一条产品生产量数据,流 dws_stream_agg_monthdws_stream_agg_year 会对新增的数据进行实时连续聚集。

      INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
  • 再次查询表 dws_stream_agg_monthdws_stream_agg_year,表中显示最新的聚集数据结果。

      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |        2
           1001 | 2024 |  2 | 2024-02-01 |        27 |        1
           1001 | 2024 |  4 | 2024-04-01 |        30 |        1
           1002 | 2023 | 12 | 2023-12-01 |        59 |        1
           1002 | 2024 |  1 | 2024-01-01 |        34 |        1
           1002 | 2024 |  2 | 2024-02-01 |        57 |        1
      (6 rows)
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
        category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       92 |        4
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
  1. 若我们需要更新产品生产量数据表 dwd_production 时,则可使用 UPDATE 语句进行操作。

     UPDATE dwd_production SET value = 100 WHERE id = 7;
  • 然后查询流表 dws_stream_agg_monthdws_stream_agg_year,聚集结果已更新。

      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1
           1001 | 2024 |  4 | 2024-04-01 |       100 |         1
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
    
       category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |      162 |        4
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
    
  1. 若我们需要删除产品生产量数据表 dwd_production 的数据时,则可使用 DELETE 语句进行操作。

     DELETE FROM dwd_production WHERE id = 7;
  • 然后查询流表 dws_stream_agg_monthdws_stream_agg_year,聚集结果已更新。

      -- 因为逆向聚集的天然限制,删除数据行之后,下游的流无法将 sum() 结果更新为NULL,则将删除数据后的聚集结果置为 0 。
      DELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt      
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2 
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1 
           1001 | 2024 |  4 | 2024-04-01 |         0 |         0 
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1 
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1 
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1 
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
       category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       62 |        3
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)

用例 3 :交通数据双流关联

“双流 JOIN 计算”通常指的是在流处理中,对两个实时数据流进行连接(JOIN)操作,以便能够根据某些共同的键将来自两个不同流的数据项组合在一起进行分析或进一步的处理。YMatrix 数据库流计算支持对新增数据的增量 JOIN 计算。我们将通过路况实时监测的例子来进一步使用该功能,体验双数据流实时增量 JOIN 的效果。
通过对车流量和交通事故数量的关联可以通过预先设定的预警阀值快速判断当前道路情况,实现及时的路况预警和事故处理。


表结构

  • dwd_traffic_flow 为车流量数据表

    字段 含义
    id 道路编号
    road_n 车流量数据信息
  • dwd_traffic_event 为交通事故数据表

    字段 含义
    id 道路编号
    traf_n 交通事故发生的数量
  • dws_stream_trafficinfo_total 为双流连接后的交通数据表。

    字段 含义
    id 道路编号
    road_n 车流量数据信息
    traf_n 交通事故发生的数量

操作

  1. 第一步,让我们创建表 dwd_traffic_flow 和表 dwd_traffic_event 分别存储道路车流量的数据流与交通事故的数据流。

     CREATE TABLE dwd_traffic_flow (
         id int,
         road_n int
     )
     DISTRIBUTED BY (id);
     CREATE INDEX ON dwd_traffic_flow (id);
    
     CREATE TABLE dwd_traffic_event (
         id int,
         traf_n int
     )
     DISTRIBUTED BY (id);
     CREATE INDEX ON dwd_traffic_event (id);
  2. 第二步,创建流 dws_stream_trafficinfo_total,其被定义为对表 dwd_traffic_flow 和表 `dwd_traffic_event 进行流式 JOIN 计算,即当道路车流量的数据流或交通事故的数据流发生变化时都会实时更新至流表的结果中。

     CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
     AS (
         SELECT
             dwd_traffic_flow.id,
             dwd_traffic_flow.road_n,
             dwd_traffic_event.traf_n
         FROM STREAMING ALL dwd_traffic_flow
         INNER JOIN STREAMING ALL dwd_traffic_event
             ON dwd_traffic_flow.id = dwd_traffic_event.id
     ) PRIMARY KEY (id);
  3. 第三步,分析并查询数据结果,通过分析表 dws_stream_trafficinfo_total 的数据来判断交通流量模式和预测交通拥堵情况,当道路中的车流量达到某个阀值或发生交通事故时,及时预警。

  • 我们首先在表 dwd_traffic_flow 和表 dwd_traffic_event 中各插入一条交通记录。

      INSERT INTO dwd_traffic_flow VALUES (1, 80);
    
      INSERT INTO dwd_traffic_event VALUES (2, 3);
  • 然后查询流表 dws_stream_trafficinfo_total,由于已有数据不适用双流 JOIN 操作,则表 dws_stream_trafficinfo_total 中暂时未查询到结果数据。

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n | traf_n
      ----+--------+--------
      (0 rows)
  • 我们再次向表 dwd_traffic_flow 和表 dwd_traffic_event 中各新增 2 条交通记录。

      INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
    
      INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
    
  • 然后查询流表 dws_stream_trafficinfo_total。流 dws_stream_trafficinfo_total 对新增的数据进行增量计算,显示最新的双流 JOIN 结果。

      SELECT * FROM dws_stream_trafficinfo_total;
       id | road_n | traf_n
      ----+--------+--------
        2 |        |      3
        1 |     80 |      5
        3 |    100 |
      (3 rows)
  1. 若我们需要修改表 dwd_traffic_flowdwd_traffic_event 中的数据,则可使用 UPDATE语句进行操作。

     UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
  • 然后查询流表 dws_stream_trafficinfo_total。流 dws_stream_trafficinfo_total 对修改后的数据进行实时 JOIN 计算,显示最新数据结果。

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n | traf_n
      ----+--------+--------
        1 |     80 |      5
        3 |    100 |
        2 |    101 |      3
      (3 rows)
  1. 若我们需要删除表 dwd_traffic_flow 或表 dwd_traffic_event 中的数据,则可使用 DELETE 语句进行操作。

     DELETE FROM dwd_traffic_event WHERE id = 2;
  • 然后查询流表 dws_stream_trafficinfo_total。流 dws_stream_trafficinfo_total 对删除后的数据进行实时 JOIN 计算,显示最新数据结果。

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n  | traf_n
      ----+---------+--------
        1 |   80    |   5
        3 |   100   |
      (2 rows)