流计算功能示例

嵌套查询流

WITH [NO] DATA 的位置必须放置在子查询的最外层

  • SQL 定义示例
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);

CREATE TABLE dim1 (id int, data text);
CREATE TABLE dim2 (id int, data text);

CREATE STREAM s8
AS (
  select a.id,a.c1 from (
    SELECT t8.id, t8.c1, ts
    , t8.d1_id, dim1.data AS d1_data
    , t8.d2_id, dim2.data AS d2_data
    FROM STREAMING t8
    INNER JOIN dim1 ON t8.d1_id = dim1.id 
    LEFT JOIN dim2 ON t8.d2_id = dim2.id
  )  a where a.id >1
 WITH  DATA
)
distributed by(id);
  • 结果展示
=# INSERT INTO dim1 VALUES (1,'mmt1');
=# INSERT INTO dim1 VALUES (2,'mmt2');
=# INSERT INTO dim2 VALUES (1,'hmz1');
=# INSERT INTO dim2 VALUES (2,'hmz2');

=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);

=# select * from s8;
 id | c1 
----+----
(0 rows)

=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);

=# select * from s8;
 id | c1  
----+-----
  2 | mmt
(1 row)

扩维流

  • SQL 定义示例
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);

CREATE TABLE dim1 (id int, data text);
CREATE TABLE dim2 (id int, data text);

CREATE STREAM s8
AS (
SELECT t8.id, t8.c1, ts
, t8.d1_id, dim1.data AS d1_data
, t8.d2_id, dim2.data AS d2_data
 FROM STREAMING t8
 INNER JOIN dim1 ON t8.d1_id = dim1.id 
 LEFT JOIN dim2 ON t8.d2_id = dim2.id
 WITH NO DATA
);
  • 结果展示
--维度表上插入数据
=# INSERT INTO dim1 VALUES (1,'mmt1');
=# INSERT INTO dim1 VALUES (2,'mmt2');

=# INSERT INTO dim2 VALUES (1,'hmz1');
=# INSERT INTO dim2 VALUES (2,'hmz2');

=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);
=# select * from t8;
 id | d1_id | d2_id | c1  |             ts             
----+-------+-------+-----+----------------------------
  1 |     1 |     1 | mmt | 2024-05-28 03:59:11.873657
(1 row)

=# select * from s8;
 id | c1  |             ts             | d1_id | d1_data | d2_id | d2_data 
----+-----+----------------------------+-------+---------+-------+---------
  1 | mmt | 2024-05-28 03:59:11.873657 |     1 | mmt1    |     1 | hmz1
(1 row)

=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);

=# select * from s8;
 id | c1  |             ts             | d1_id | d1_data | d2_id | d2_data 
----+-----+----------------------------+-------+---------+-------+---------
  2 | mmt | 2024-05-28 04:00:03.687348 |     2 | mmt2    |     2 | hmz2
  1 | mmt | 2024-05-28 03:59:11.873657 |     1 | mmt1    |     1 | hmz1
(2 rows)

聚集流

  • SQL 定义示例
CREATE TABLE t1 (id int, id2 int, id3 int, v1 int, v2 bigint)
DISTRIBUTED BY (id);

CREATE STREAM s1(id, id2, id3, min1, avg1, avg2)
AS (
    SELECT id, id2, id3, min(v1), avg(v1), avg(v2)
    FROM STREAMING t1
    GROUP BY id, id2, id3
    WITH NO DATA
)
DISTRIBUTED BY (id); 
  • 结果展示
INSERT INTO t1 VALUES(1,2,3,10,20);
INSERT INTO t1 VALUES(1,2,2,5,10);

INSERT INTO t1 VALUES(1,2,4,5,10);
insert INTO t1 VALUES(1,2,4,4,10);

mmt=# SELECT * FROM s1;
 id | id2 | id3 | min1 |        avg1         |        avg2         | partial_avg1 |              partial_avg2              
----+-----+-----+------+---------------------+---------------------+--------------+----------------------------------------
  1 |   2 |   2 |    5 |  5.0000000000000000 | 10.0000000000000000 | {1,5}        | \x00000000000000010001000000000000000a
  1 |   2 |   3 |   10 | 10.0000000000000000 | 20.0000000000000000 | {1,10}       | \x000000000000000100010000000000000014
  1 |   2 |   4 |    4 |  4.5000000000000000 | 10.0000000000000000 | {2,9}        | \x000000000000000200010000000000000014
(3 rows)

多级流

  • SQL 定义示例
CREATE TABLE t6 (id int, c1 text, ts timestamp);
create table dim1(id int,col1 text);
create table dim2(id int,col2 text);

CREATE STREAM s61(id, c1, ts,col1, arrive_s61)
AS (
    SELECT t6.*,dim1.col1, clock_timestamp()
    FROM STREAMING t6 join dim1 on t6.id=dim1.id
    WITH NO DATA
)
DISTRIBUTED BY (c1);

CREATE STREAM s62(id, c1, ts,col1,arrive_s61,col2, arrive_s6_2)
AS (
    SELECT s61.*,dim2.col2, clock_timestamp()
    FROM STREAMING s61 join dim2 on s61.id=dim2.id
    WITH NO DATA
)
DISTRIBUTED BY (id);                         
  • 结果展示
INSERT INTO dim1 VALUES(1,'mmt1');
INSERT INTO dim1 VALUES(2,'hmz1');
INSERT INTO dim2 VALUES(1,'mmt2');
INSERT INTO dim2 VALUES(2,'hmz2');

INSERT INTO t6 VALUES (1,'mmt',current_timestamp);
INSERT INTO t6 VALUES (2,'hmz',current_timestamp);

=# select * from t6;
 id | c1  |             ts             
----+-----+----------------------------
  2 | hmz | 2024-05-28 04:15:07.591406
  1 | mmt | 2024-05-28 04:15:06.775371
(2 rows)

=# select * from s61;
 id | c1  |             ts             | col1 |          arrive_s61           
----+-----+----------------------------+------+-------------------------------
  1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04
  2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04
(2 rows)

=# select * from s62;
 id | c1  |             ts             | col1 |          arrive_s61           | col2 |          arrive_s6_2          
----+-----+----------------------------+------+-------------------------------+------+-------------------------------
  2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04 | hmz2 | 2024-05-28 04:15:08.725532-04
  1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04 | mmt2 | 2024-05-28 04:15:07.724866-04
(2 rows)

一表多流

  • SQL 定义示例
CREATE TABLE t5 (id int, c1 text, ts timestamp);

CREATE STREAM s51(id, c1, ts, arrive_s51)
AS (
 SELECT *, clock_timestamp()
 FROM STREAMING t5 WHERE id > 7
 WITH NO DATA
)
DISTRIBUTED BY (id);

CREATE STREAM s52(id, c1, ts, arrive_s52)
AS (
 SELECT *, clock_timestamp()
 FROM STREAMING t5 WHERE id < 3
 WITH NO DATA
)
DISTRIBUTED BY (c1);
  • 结果展示
INSERT INTO t5 VALUES(1,'mmt',current_timestamp);
INSERT INTO t5 VALUES(7,'hmz',current_timestamp);
INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);


=# SELECT * FROM s51;
 id | c1 | ts | arrive_s51 
----+----+----+------------
(0 rows)

=# SELECT * FROM s52;
 id | c1  |             ts             |          arrive_s52           
----+-----+----------------------------+-------------------------------
  1 | mmt | 2024-05-28 04:17:38.090422 | 2024-05-28 04:17:38.278591-04
(1 row)

=# INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);

=# SELECT * FROM s51;
 id |  c1  |             ts             |          arrive_s51           
----+------+----------------------------+-------------------------------
  8 | hmz2 | 2024-05-28 04:18:08.299554 | 2024-05-28 04:18:09.241703-04
(1 row)

上游表多次引用

  • SQL 定义示例
CREATE TABLE t3 (id int, c1 text, ts timestamp);

 CREATE STREAM s3(id, c1, ts, arrive_s3)
 AS (
   SELECT *, clock_timestamp()
   FROM STREAMING t3 WHERE id > 7
 UNION ALL
   SELECT *, clock_timestamp()
   FROM STREAMING t3 WHERE id < 3 
   WITH NO DATA
)
DISTRIBUTED BY (arrive_s3);
  • 结果展示
INSERT INTO t3 VALUES (1,'mmt',current_timestamp); 
INSERT INTO t3 VALUES (8,'hmz',current_timestamp);     

=# SELECT * FROM s3;
 id | c1  |             ts             |           arrive_s3           
----+-----+----------------------------+-------------------------------
  1 | mmt | 2024-05-28 04:20:28.31711  | 2024-05-28 04:20:28.542517-04
  8 | hmz | 2024-05-28 04:20:29.399698 | 2024-05-28 04:20:29.540877-04
(2 rows)

流表多引擎支持

  • SQL 定义示例
CREATE TABLE t4 (id int, c1 text, ts timestamp);

CREATE STREAM s4(id, c1, ts, arrive_s4)
AS (
   SELECT *, clock_timestamp()
   FROM STREAMING t4
   WITH NO DATA
)
USING mars3
    WITH (compresstype='zstd', compresslevel=1)
    DISTRIBUTED BY (id, arrive_s4)
    ORDER BY id
    PARTITION BY RANGE (ts)
(
   START (date '2024-03-01') INCLUSIVE
   END (date '2025-01-01') EXCLUSIVE
   EVERY (INTERVAL '1 month')
);
  • 结果展示
=# \dS+ s4
                                          Partitioned stream "public.s4"
  Column   |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description 
-----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
 id        | integer                     |           |          |         | plain    |              | 
 c1        | text                        |           |          |         | extended |              | 
 ts        | timestamp without time zone |           |          |         | plain    |              | 
 arrive_s4 | timestamp with time zone    |           |          |         | plain    |              | 
Partition key: RANGE (ts)
View definition:

Partitions: s4_1_prt_1 FOR VALUES FROM ('2024-03-01 00:00:00') TO ('2024-04-01 00:00:00'),
            s4_1_prt_10 FOR VALUES FROM ('2024-12-01 00:00:00') TO ('2025-01-01 00:00:00'),
            s4_1_prt_2 FOR VALUES FROM ('2024-04-01 00:00:00') TO ('2024-05-01 00:00:00'),
            s4_1_prt_3 FOR VALUES FROM ('2024-05-01 00:00:00') TO ('2024-06-01 00:00:00'),
            s4_1_prt_4 FOR VALUES FROM ('2024-06-01 00:00:00') TO ('2024-07-01 00:00:00'),
            s4_1_prt_5 FOR VALUES FROM ('2024-07-01 00:00:00') TO ('2024-08-01 00:00:00'),
            s4_1_prt_6 FOR VALUES FROM ('2024-08-01 00:00:00') TO ('2024-09-01 00:00:00'),
            s4_1_prt_7 FOR VALUES FROM ('2024-09-01 00:00:00') TO ('2024-10-01 00:00:00'),
            s4_1_prt_8 FOR VALUES FROM ('2024-10-01 00:00:00') TO ('2024-11-01 00:00:00'),
            s4_1_prt_9 FOR VALUES FROM ('2024-11-01 00:00:00') TO ('2024-12-01 00:00:00')
Distributed by: (id, arrive_s4)
Access method: mars3
Order by: (id)
Options: compresstype=zstd, compresslevel=1



=# INSERT INTO t4 VALUES (1,'mmt', current_timestamp);

=# INSERT INTO t4 VALUES (1,'mmt', to_timestamp('2024-10-02 14:00:00','yyyy-mm-dd hh24:mi:ss'));

=# SELECT * FROM s4;
 id | c1  |             ts             |           arrive_s4           
----+-----+----------------------------+-------------------------------
  1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
  1 | mmt | 2024-10-02 14:00:00        | 2024-05-28 04:24:05.668132-04
(2 rows)

=# SELECT * FROM s4_1_prt_3 ;
 id | c1  |             ts             |          arrive_s4           
----+-----+----------------------------+------------------------------
  1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
(1 row)

=# SELECT * FROM s4_1_prt_8 ;
 id | c1  |         ts          |           arrive_s4           
----+-----+---------------------+-------------------------------
  1 | mmt | 2024-10-02 14:00:00 | 2024-05-28 04:24:05.668132-04
(1 row)

双流 JOIN

  • SQL 定义示例

-- 双流join时,要求上游的两个表在关联字段上存在索引,避免数据量越来越大时,每次增量计算的代价随之增加
-- 如果上游的两个表缺少关联字段上的索引,默认创建流时会报错,可以通过打开GUC mxstream.domino_join_skip_index_check来跳过此检查

CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id);
CREATE INDEX ON t1 (id);
CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id);
CREATE INDEX ON t2 (id);


CREATE STREAM s1(id, v1, v2) AS (
    SELECT t1.id, t1.v1, t2.v2
    FROM STREAMING t1
    INNER JOIN STREAMING t2 ON t1.id = t2.id
) DISTRIBUTED BY (id);
  • 结果展示

可以看出,s1被定义为:对t1表和t2表进行流式JOIN,即无论哪一侧数据更新结果都会更新,例如:

-- 先插入两条不匹配的记录
=# INSERT INTO t1 VALUES (1, 100);

=# INSERT INTO t2 VALUES (2, 99);

-- 没有匹配记录
=# SELECT * FROM s1;
 id | v1 | v2
----+----+----
(0 rows)

-- 再插入4条记录,分别与之前数据匹配(2)以及与新插入数据匹配(3)
=# INSERT INTO t1 VALUES (2, NULL), (3, 100);

=# INSERT INTO t2 VALUES (3, NULL), (3, NULL);


-- (2)(3)分别匹配
=# SELECT * FROM s1;
 id | v1  | v2
----+-----+----
  2 |     | 99
  3 | 100 |
  3 | 100 |
(3 rows)

三流 JOIN

  • SQL 定义示例
-- 创建3张输入表
CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id);
CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id);
CREATE TABLE t3 (id int, v3 int) DISTRIBUTED BY (id);
-- 创建第一个流 s1=t1⨝t1
CREATE STREAM s1 (id, v1, v2) AS (
        SELECT t1.id, t1.v1, t2.v2 FROM STREAMING t1
        INNER JOIN STREAMING t2 ON t1.id = t2.id
) DISTRIBUTED BY (id);
-- 创建第二流 s2=s1⨝t3=t1⨝t2⨝t3
CREATE STREAM s2 (id, v1, v2, v3) AS (
        SELECT s1.id, s1.v1, s1.v2, t3.v3 FROM STREAMING s1
        INNER JOIN STREAMING t3 ON s1.id = t3.id
) DISTRIBUTED BY (id);
  • 结果展示
-- 写入第一批数据,构造s1有新数据,s2没有新数据的情况
INSERT INTO t1 VALUES (1, 100), (2, 100);
INSERT INTO t2 VALUES (1, 100);

-- 因为两个流分别计算,第二个流需要在第一个流计算完成后触发,等待两个计算周期
SELECT pg_sleep(2);
 pg_sleep
----------

(1 row)

-- s1有1条数据
SELECT * FROM s1;
 id | v1  | v2
----+-----+-----
  1 | 100 | 100
(1 row)

-- s2没有数据,因为t3是空的
SELECT * FROM s2;
 id | v1 | v2 | v3
----+----+----+----
(0 rows)

-- 写入第二批数据,构造s1,s2分别有新数据的情况
INSERT INTO t1 VALUES (3, NULL);
INSERT 0 1
INSERT INTO t2 VALUES (2, NULL), (3, 99);
INSERT 0 2
INSERT INTO t3 VALUES (1, NULL), (2, 98), (3, 100);
INSERT 0 3

-- 同上,等待2个计算周期
SELECT pg_sleep(2);
 pg_sleep
----------

(1 row)

-- s1新增2条数据
SELECT * FROM s1;
 id | v1  | v2
----+-----+-----
  1 | 100 | 100    -- 上次计算结果
  2 | 100 |        -- t1 ⨝ Δt2
  3 |     |  99    -- Δt1 ⨝ Δt2
(3 rows)

-- s2新增3条数据
SELECT * FROM s2;
 id | v1  | v2  | v3
----+-----+-----+-----
  1 | 100 | 100 |        -- s1 ⨝ Δt3
  2 | 100 |     |  98    -- Δs1 ⨝ Δt3 
  3 |     |  99 | 100    -- Δs1 ⨝ Δt3 
(3 rows)