流计算功能示例
嵌套查询流
WITH [NO] DATA 的位置必须放置在子查询的最外层
- SQL 定义示例
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);
CREATE TABLE dim1 (id int, data text);
CREATE TABLE dim2 (id int, data text);
CREATE STREAM s8
AS (
select a.id,a.c1 from (
SELECT t8.id, t8.c1, ts
, t8.d1_id, dim1.data AS d1_data
, t8.d2_id, dim2.data AS d2_data
FROM STREAMING t8
INNER JOIN dim1 ON t8.d1_id = dim1.id
LEFT JOIN dim2 ON t8.d2_id = dim2.id
) a where a.id >1
WITH DATA
)
distributed by(id);
- 结果展示
=# INSERT INTO dim1 VALUES (1,'mmt1');
=# INSERT INTO dim1 VALUES (2,'mmt2');
=# INSERT INTO dim2 VALUES (1,'hmz1');
=# INSERT INTO dim2 VALUES (2,'hmz2');
=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);
=# select * from s8;
id | c1
----+----
(0 rows)
=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);
=# select * from s8;
id | c1
----+-----
2 | mmt
(1 row)
扩维流
- SQL 定义示例
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);
CREATE TABLE dim1 (id int, data text);
CREATE TABLE dim2 (id int, data text);
CREATE STREAM s8
AS (
SELECT t8.id, t8.c1, ts
, t8.d1_id, dim1.data AS d1_data
, t8.d2_id, dim2.data AS d2_data
FROM STREAMING t8
INNER JOIN dim1 ON t8.d1_id = dim1.id
LEFT JOIN dim2 ON t8.d2_id = dim2.id
WITH NO DATA
);
- 结果展示
--维度表上插入数据
=# INSERT INTO dim1 VALUES (1,'mmt1');
=# INSERT INTO dim1 VALUES (2,'mmt2');
=# INSERT INTO dim2 VALUES (1,'hmz1');
=# INSERT INTO dim2 VALUES (2,'hmz2');
=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);
=# select * from t8;
id | d1_id | d2_id | c1 | ts
----+-------+-------+-----+----------------------------
1 | 1 | 1 | mmt | 2024-05-28 03:59:11.873657
(1 row)
=# select * from s8;
id | c1 | ts | d1_id | d1_data | d2_id | d2_data
----+-----+----------------------------+-------+---------+-------+---------
1 | mmt | 2024-05-28 03:59:11.873657 | 1 | mmt1 | 1 | hmz1
(1 row)
=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);
=# select * from s8;
id | c1 | ts | d1_id | d1_data | d2_id | d2_data
----+-----+----------------------------+-------+---------+-------+---------
2 | mmt | 2024-05-28 04:00:03.687348 | 2 | mmt2 | 2 | hmz2
1 | mmt | 2024-05-28 03:59:11.873657 | 1 | mmt1 | 1 | hmz1
(2 rows)
聚集流
- SQL 定义示例
CREATE TABLE t1 (id int, id2 int, id3 int, v1 int, v2 bigint)
DISTRIBUTED BY (id);
CREATE STREAM s1(id, id2, id3, min1, avg1, avg2)
AS (
SELECT id, id2, id3, min(v1), avg(v1), avg(v2)
FROM STREAMING t1
GROUP BY id, id2, id3
WITH NO DATA
)
DISTRIBUTED BY (id);
- 结果展示
INSERT INTO t1 VALUES(1,2,3,10,20);
INSERT INTO t1 VALUES(1,2,2,5,10);
INSERT INTO t1 VALUES(1,2,4,5,10);
insert INTO t1 VALUES(1,2,4,4,10);
mmt=# SELECT * FROM s1;
id | id2 | id3 | min1 | avg1 | avg2 | partial_avg1 | partial_avg2
----+-----+-----+------+---------------------+---------------------+--------------+----------------------------------------
1 | 2 | 2 | 5 | 5.0000000000000000 | 10.0000000000000000 | {1,5} | \x00000000000000010001000000000000000a
1 | 2 | 3 | 10 | 10.0000000000000000 | 20.0000000000000000 | {1,10} | \x000000000000000100010000000000000014
1 | 2 | 4 | 4 | 4.5000000000000000 | 10.0000000000000000 | {2,9} | \x000000000000000200010000000000000014
(3 rows)
多级流
- SQL 定义示例
CREATE TABLE t6 (id int, c1 text, ts timestamp);
create table dim1(id int,col1 text);
create table dim2(id int,col2 text);
CREATE STREAM s61(id, c1, ts,col1, arrive_s61)
AS (
SELECT t6.*,dim1.col1, clock_timestamp()
FROM STREAMING t6 join dim1 on t6.id=dim1.id
WITH NO DATA
)
DISTRIBUTED BY (c1);
CREATE STREAM s62(id, c1, ts,col1,arrive_s61,col2, arrive_s6_2)
AS (
SELECT s61.*,dim2.col2, clock_timestamp()
FROM STREAMING s61 join dim2 on s61.id=dim2.id
WITH NO DATA
)
DISTRIBUTED BY (id);
- 结果展示
INSERT INTO dim1 VALUES(1,'mmt1');
INSERT INTO dim1 VALUES(2,'hmz1');
INSERT INTO dim2 VALUES(1,'mmt2');
INSERT INTO dim2 VALUES(2,'hmz2');
INSERT INTO t6 VALUES (1,'mmt',current_timestamp);
INSERT INTO t6 VALUES (2,'hmz',current_timestamp);
=# select * from t6;
id | c1 | ts
----+-----+----------------------------
2 | hmz | 2024-05-28 04:15:07.591406
1 | mmt | 2024-05-28 04:15:06.775371
(2 rows)
=# select * from s61;
id | c1 | ts | col1 | arrive_s61
----+-----+----------------------------+------+-------------------------------
1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04
2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04
(2 rows)
=# select * from s62;
id | c1 | ts | col1 | arrive_s61 | col2 | arrive_s6_2
----+-----+----------------------------+------+-------------------------------+------+-------------------------------
2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04 | hmz2 | 2024-05-28 04:15:08.725532-04
1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04 | mmt2 | 2024-05-28 04:15:07.724866-04
(2 rows)
一表多流
- SQL 定义示例
CREATE TABLE t5 (id int, c1 text, ts timestamp);
CREATE STREAM s51(id, c1, ts, arrive_s51)
AS (
SELECT *, clock_timestamp()
FROM STREAMING t5 WHERE id > 7
WITH NO DATA
)
DISTRIBUTED BY (id);
CREATE STREAM s52(id, c1, ts, arrive_s52)
AS (
SELECT *, clock_timestamp()
FROM STREAMING t5 WHERE id < 3
WITH NO DATA
)
DISTRIBUTED BY (c1);
- 结果展示
INSERT INTO t5 VALUES(1,'mmt',current_timestamp);
INSERT INTO t5 VALUES(7,'hmz',current_timestamp);
INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);
=# SELECT * FROM s51;
id | c1 | ts | arrive_s51
----+----+----+------------
(0 rows)
=# SELECT * FROM s52;
id | c1 | ts | arrive_s52
----+-----+----------------------------+-------------------------------
1 | mmt | 2024-05-28 04:17:38.090422 | 2024-05-28 04:17:38.278591-04
(1 row)
=# INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);
=# SELECT * FROM s51;
id | c1 | ts | arrive_s51
----+------+----------------------------+-------------------------------
8 | hmz2 | 2024-05-28 04:18:08.299554 | 2024-05-28 04:18:09.241703-04
(1 row)
上游表多次引用
- SQL 定义示例
CREATE TABLE t3 (id int, c1 text, ts timestamp);
CREATE STREAM s3(id, c1, ts, arrive_s3)
AS (
SELECT *, clock_timestamp()
FROM STREAMING t3 WHERE id > 7
UNION ALL
SELECT *, clock_timestamp()
FROM STREAMING t3 WHERE id < 3
WITH NO DATA
)
DISTRIBUTED BY (arrive_s3);
- 结果展示
INSERT INTO t3 VALUES (1,'mmt',current_timestamp);
INSERT INTO t3 VALUES (8,'hmz',current_timestamp);
=# SELECT * FROM s3;
id | c1 | ts | arrive_s3
----+-----+----------------------------+-------------------------------
1 | mmt | 2024-05-28 04:20:28.31711 | 2024-05-28 04:20:28.542517-04
8 | hmz | 2024-05-28 04:20:29.399698 | 2024-05-28 04:20:29.540877-04
(2 rows)
流表多引擎支持
- SQL 定义示例
CREATE TABLE t4 (id int, c1 text, ts timestamp);
CREATE STREAM s4(id, c1, ts, arrive_s4)
AS (
SELECT *, clock_timestamp()
FROM STREAMING t4
WITH NO DATA
)
USING mars3
WITH (compresstype='zstd', compresslevel=1)
DISTRIBUTED BY (id, arrive_s4)
ORDER BY id
PARTITION BY RANGE (ts)
(
START (date '2024-03-01') INCLUSIVE
END (date '2025-01-01') EXCLUSIVE
EVERY (INTERVAL '1 month')
);
- 结果展示
=# \dS+ s4
Partitioned stream "public.s4"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
id | integer | | | | plain | |
c1 | text | | | | extended | |
ts | timestamp without time zone | | | | plain | |
arrive_s4 | timestamp with time zone | | | | plain | |
Partition key: RANGE (ts)
View definition:
Partitions: s4_1_prt_1 FOR VALUES FROM ('2024-03-01 00:00:00') TO ('2024-04-01 00:00:00'),
s4_1_prt_10 FOR VALUES FROM ('2024-12-01 00:00:00') TO ('2025-01-01 00:00:00'),
s4_1_prt_2 FOR VALUES FROM ('2024-04-01 00:00:00') TO ('2024-05-01 00:00:00'),
s4_1_prt_3 FOR VALUES FROM ('2024-05-01 00:00:00') TO ('2024-06-01 00:00:00'),
s4_1_prt_4 FOR VALUES FROM ('2024-06-01 00:00:00') TO ('2024-07-01 00:00:00'),
s4_1_prt_5 FOR VALUES FROM ('2024-07-01 00:00:00') TO ('2024-08-01 00:00:00'),
s4_1_prt_6 FOR VALUES FROM ('2024-08-01 00:00:00') TO ('2024-09-01 00:00:00'),
s4_1_prt_7 FOR VALUES FROM ('2024-09-01 00:00:00') TO ('2024-10-01 00:00:00'),
s4_1_prt_8 FOR VALUES FROM ('2024-10-01 00:00:00') TO ('2024-11-01 00:00:00'),
s4_1_prt_9 FOR VALUES FROM ('2024-11-01 00:00:00') TO ('2024-12-01 00:00:00')
Distributed by: (id, arrive_s4)
Access method: mars3
Order by: (id)
Options: compresstype=zstd, compresslevel=1
=# INSERT INTO t4 VALUES (1,'mmt', current_timestamp);
=# INSERT INTO t4 VALUES (1,'mmt', to_timestamp('2024-10-02 14:00:00','yyyy-mm-dd hh24:mi:ss'));
=# SELECT * FROM s4;
id | c1 | ts | arrive_s4
----+-----+----------------------------+-------------------------------
1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
1 | mmt | 2024-10-02 14:00:00 | 2024-05-28 04:24:05.668132-04
(2 rows)
=# SELECT * FROM s4_1_prt_3 ;
id | c1 | ts | arrive_s4
----+-----+----------------------------+------------------------------
1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
(1 row)
=# SELECT * FROM s4_1_prt_8 ;
id | c1 | ts | arrive_s4
----+-----+---------------------+-------------------------------
1 | mmt | 2024-10-02 14:00:00 | 2024-05-28 04:24:05.668132-04
(1 row)
双流 JOIN
- SQL 定义示例
-- 双流join时,要求上游的两个表在关联字段上存在索引,避免数据量越来越大时,每次增量计算的代价随之增加
-- 如果上游的两个表缺少关联字段上的索引,默认创建流时会报错,可以通过打开GUC mxstream.domino_join_skip_index_check来跳过此检查
CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id);
CREATE INDEX ON t1 (id);
CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id);
CREATE INDEX ON t2 (id);
CREATE STREAM s1(id, v1, v2) AS (
SELECT t1.id, t1.v1, t2.v2
FROM STREAMING t1
INNER JOIN STREAMING t2 ON t1.id = t2.id
) DISTRIBUTED BY (id);
- 结果展示
可以看出,s1被定义为:对t1表和t2表进行流式JOIN,即无论哪一侧数据更新结果都会更新,例如:
-- 先插入两条不匹配的记录
=# INSERT INTO t1 VALUES (1, 100);
=# INSERT INTO t2 VALUES (2, 99);
-- 没有匹配记录
=# SELECT * FROM s1;
id | v1 | v2
----+----+----
(0 rows)
-- 再插入4条记录,分别与之前数据匹配(2)以及与新插入数据匹配(3)
=# INSERT INTO t1 VALUES (2, NULL), (3, 100);
=# INSERT INTO t2 VALUES (3, NULL), (3, NULL);
-- (2)(3)分别匹配
=# SELECT * FROM s1;
id | v1 | v2
----+-----+----
2 | | 99
3 | 100 |
3 | 100 |
(3 rows)
三流 JOIN
- SQL 定义示例
-- 创建3张输入表
CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id);
CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id);
CREATE TABLE t3 (id int, v3 int) DISTRIBUTED BY (id);
-- 创建第一个流 s1=t1⨝t1
CREATE STREAM s1 (id, v1, v2) AS (
SELECT t1.id, t1.v1, t2.v2 FROM STREAMING t1
INNER JOIN STREAMING t2 ON t1.id = t2.id
) DISTRIBUTED BY (id);
-- 创建第二流 s2=s1⨝t3=t1⨝t2⨝t3
CREATE STREAM s2 (id, v1, v2, v3) AS (
SELECT s1.id, s1.v1, s1.v2, t3.v3 FROM STREAMING s1
INNER JOIN STREAMING t3 ON s1.id = t3.id
) DISTRIBUTED BY (id);
- 结果展示
-- 写入第一批数据,构造s1有新数据,s2没有新数据的情况
INSERT INTO t1 VALUES (1, 100), (2, 100);
INSERT INTO t2 VALUES (1, 100);
-- 因为两个流分别计算,第二个流需要在第一个流计算完成后触发,等待两个计算周期
SELECT pg_sleep(2);
pg_sleep
----------
(1 row)
-- s1有1条数据
SELECT * FROM s1;
id | v1 | v2
----+-----+-----
1 | 100 | 100
(1 row)
-- s2没有数据,因为t3是空的
SELECT * FROM s2;
id | v1 | v2 | v3
----+----+----+----
(0 rows)
-- 写入第二批数据,构造s1,s2分别有新数据的情况
INSERT INTO t1 VALUES (3, NULL);
INSERT 0 1
INSERT INTO t2 VALUES (2, NULL), (3, 99);
INSERT 0 2
INSERT INTO t3 VALUES (1, NULL), (2, 98), (3, 100);
INSERT 0 3
-- 同上,等待2个计算周期
SELECT pg_sleep(2);
pg_sleep
----------
(1 row)
-- s1新增2条数据
SELECT * FROM s1;
id | v1 | v2
----+-----+-----
1 | 100 | 100 -- 上次计算结果
2 | 100 | -- t1 ⨝ Δt2
3 | | 99 -- Δt1 ⨝ Δt2
(3 rows)
-- s2新增3条数据
SELECT * FROM s2;
id | v1 | v2 | v3
----+-----+-----+-----
1 | 100 | 100 | -- s1 ⨝ Δt3
2 | 100 | | 98 -- Δs1 ⨝ Δt3
3 | | 99 | 100 -- Δs1 ⨝ Δt3
(3 rows)