CREATE STREAM

定义一个流。

概要

CREATE STREAM [IF NOT EXISTS] stream_name [ (column_name [, ...] ) ]
AS(
    SELECT [ { * | expression [ [ AS ] output_name ] } [, ...] ]
    FROM STREAMING { ALL | INSERT |UPDATE | DELETE } table_name
    [ WHERE condition ]
    [ join_type ]
    [ GROUP BY grouping_element [, ...] ]
    [ WITH [ NO ] DATA ]
)
[ USING MARS3 ]
[ WITH ( storage_parameter [= value] [, ... ] ) ]
[ DISTRIBUTED BY ( [column_name [, ...] ] ) ]

描述

CREATE STREAM 声明一个流的定义,在数据库内部定义内部管理函数。

参数

  • stream_name [ (column_name [, ...] ) ]

    • stream_name :流表。若作为流的末端,存储流的计算结果。
    • (column_name [, ...] ):字段映射,可省略。省略时默认使用 AS 语句块中 SELECT 部分列举的字段信息。
  • AS

    • AS 语句块以 SQL 的形式定义流的计算逻辑。
  • SELECT [ { * | expression [ [ AS ] output_name ] } [, ...] ]

    • 指定所需查询的列,可以为上游表默认字段,也可指定新字段(聚集计算结果等)。
  • FROM STREAMING { ALL | INSERT |UPDATE | DELETE } table_name

    • FROM STREAMING 声明以流的方式从源表增量订阅数据,对应的数据库内部会启动一组进程监测该表上的数据变化。
    • ALL | INSERT |UPDATE | DELETE 选择流从源表的订阅方式即监测源表的某种类型数据变化。ALL 为监控源表种数据的插入、更新和删除变化;INSERT 为监控源表的插入变化;UPDATE 为监控源表的更新操作;DELETE为监控源表的删除操作。
    • table_name 流的源表(上游表)。
  • join_type

    • 目前仅支持 [ INNER ] JOIN 操作, LEFT [OUTER] JOINRIGHT [OUTER] JOIN[OUTER] JOIN 操作将在后续版本中更新。
  • WITH [NO] DATA

    • 表示创建流的时候,是否要对源表的历史数据做处理。[NO] 代表不处理。
  • USING MARS3

    • 默认创建为 HEAP 表,可使用该语句指定创建为 MARS3 表。
  • WITH ( storage_parameter [= value] [, ... ] )

  • DISTRIBUTED BY ( [column_name [, ...] ] )

    • 用于声明表的 YMatrix 数据库分布策略。

示例

创建一个基础的流。

CREATE STREAM s1(id, c1, ts, arrive_s1) 
AS ( 
    SELECT *, clock_timestamp() 
    FROM STREAMING t1
    WITH NO DATA 
) 
DISTRIBUTED BY (id); 

创建一个使用 MARS3 引擎和指定数据压缩类型和级别的流。

CREATE STREAM s4(id, c1, ts, arrive_s4)
AS (
 SELECT *, clock_timestamp()
 FROM STREAMING t4
 WITH NO DATA
)
USING mars3
WITH (compresstype='zstd', compresslevel=1)
DISTRIBUTED BY (id, arrive_s4);

更多请参考 流计算功能示例