滑动窗口

1. 背景介绍

滑动窗口是流计算场景的常见功能。在流计算数据场景中,可以对最近一段时间内的数据持续地做聚集运算。 滑动窗口通常搭配监控告警使用,当最近一段时间内数据满足预设条件时,数据服务端会向客户端发送告警消息。 例如,每台设备,每分钟计算一次平均温度,超过 90 度,则告警。

2. 什么是滑动窗口?

滑动窗口(Sliding Window),不同于滚动窗口的不重叠,滑动窗口的时间窗口是可以重叠的。 滑动窗口有 WINDOW_SIZE 和 SLIDING_SIZE 两个参数,WINDOW_SIZE 指定了窗口的展示范围,SLIDING_SIZE 为每次滑动的步长。

3. 时间窗口如何滑动?

滑动窗口的时间,选用 Insert Time,即数据入库的时间。 通过设置 SLIDING_SIZE 决定窗口的滑动步长。

  • SLIDING_SIZE < WINDOW_SIZE,则展示的窗口会重叠,每条数据会被多个时间窗口观察到。

  • SLIDING_SIZE = WINDOW_SIZE,则等同于滚筒窗口,没有重叠,每条数据只会展示一次。

  • SLIDING_SIZE > WINDOW_SIZE,则为跳跃窗口,窗口之间没有重叠且有间隙,部分数据可能不会被展示。

4. 滑动窗口如何做监控告警?

窗口每次滑动时,数据库监控当前窗口内的数据,如果窗口内的数据满足预设条件时,则推送消息。 推送通过 PostgreSQL 提供的 pg_notify 发送。 客户端用 libpq 协议提供的 listener 接口接收。(例如 go 语言 libpq.NewListener; Java 语言 java.sql.Listener) 在创建滑动窗口时,通过参数 CHANNEL_NAME 声明消息队列的名称。

滑动窗口

同一个消息队列,可以被多个客户端同时监听 滑动窗口

多个滑动窗口也可以复用同一个消息队列,也就是说,同一个客户端可以同时监听多个滑动窗口的告警消息。

滑动窗口

5. 使用方法

5.1 创建扩展

滑动窗口依赖 matrixts 扩展,首先创建扩展:

CREATE EXTENSION matrixts;

5.2 创建数据表

CREATE TABLE metrics(
    time timestamp,
    tag_id int,
    sensor float8
)
DISTRIBUTED BY (tag_id);

5.3 创建滑动窗口

CREATE VIEW sv1 WITH (
    CONTINUOUS, 
    WINDOW_SIZE='10 min',
    SLIDING_SIZE='1 min',
    CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;

5.4 参数说明:

参数分两部分,OPTION的参数和滑动窗口及消息推送相关。而SQL指定了流数据如何在滑动窗口中展示。

OPTION:

  • CONTINUOUS 声明该视图为聚集滑动窗口。
  • WINDOW_SIZE 滑动窗口监控数据的时间窗口大小。
  • SLIDING_SIZE 滑动窗口每次滑动的时间大小。
  • CHANNEL_NAME 当监控条件满足时,数据库推送的消息队列名。

SQL:

  • SELECT 每次推送的数据内容。(可以使用postgres内置json函数,组装消息格式。)
  • FROM 窗口数据来自哪张单表。(滑动窗口不支持多表数据)
  • WHERE 原数据表中,哪部分数据可以被窗口监控。
  • HAVING 监控推送的触发条件。如果省略 HAVING 条件,则窗口每次滑动都会推送消息。

5.5 示例

示例1:最高温度监控

统计每台设备过去2分钟的最高温度,每30秒更新一次。(即2分钟的时间窗口,30秒滑动一次) 消息格式为(设备号,最高温度)。 消息推送至消息队列 temp_monitor
首先,创建设备数据表 metrics_1。

CREATE TABLE metrics_1(
    time timestamp,
    tag_id int,
    temp float8
)
DISTRIBUTED BY (tag_id);

然后创建滑动窗口。

CREATE VIEW temp_sv WITH (
    CONTINUOUS, 
    WINDOW_SIZE='2 min',
    SLIDING_SIZE='30 seconds',
    CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;

示例2:最低电压告警

统计每台设备过去1分钟的电压值,每10秒更新一次。 当最低电压超过10时触发告警。 消息格式为 (设备号,平均电压,最低电压,最高电压)。 消息推送至消息队列 over_volt_alarm
首先,创建设备数据表 metrics_2

CREATE TABLE metrics_2(
    time timestamp,
    tag_id int,
    volt float8
)
DISTRIBUTED BY (tag_id);

然后创建滑动窗口。

CREATE VIEW volt_alarm WITH (
    CONTINUOUS, 
    WINDOW_SIZE='1 min',
    SLIDING_SIZE='10 seconds',
    CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;

客户端示例1: go语言

ref: https://pkg.go.dev/github.com/lib/pq/example/listen

package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
    select {
    case n := <-l.Notify:
        fmt.Println("received notification, new work available")
        fmt.Println(n.Channel)
        fmt.Println(n.Extra)

    case <-time.After(90 * time.Second):
        go l.Ping()
        // Check if there's more work available, just in case it takes
        // a while for the Listener to notice connection loss and
        // reconnect.
        fmt.Println("received no work for 90 seconds, checking for new work")
    }
}

func main() {
    var conninfo string = ""

    db, err := sql.Open("postgres", conninfo)

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Println(err.Error())
        }
    }

    minReconn := 10 * time.Second
    maxReconn := time.Minute
    fmt.Println("entering conn")
    listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
    var err = listener.Listen("my_channel")
    if err != nil {
        panic(err)
    }

    fmt.Println("entering main loop")
    for {
        // process all available work before waiting for notifications
        // getWork(db)
        waitForNotification(listener)
    }
}

客户端示例2: java语言

ref: https://jdbc.postgresql.org/documentation/81/listennotify.html

import java.sql.*;

public class NotificationTest {

        public static void main(String args[]) throws Exception {
                Class.forName("org.postgresql.Driver");
                String url = "jdbc:postgresql://localhost:5432/test";

                // Create two distinct connections, one for the notifier
                // and another for the listener to show the communication
                // works across connections although this example would
                // work fine with just one connection.
                Connection lConn = DriverManager.getConnection(url,"test","");
                Connection nConn = DriverManager.getConnection(url,"test","");

                // Create two threads, one to issue notifications and
                // the other to receive them.
                Listener listener = new Listener(lConn);
//                Notifier notifier = new Notifier(nConn);
                listener.start();
//                notifier.start();
        }

}

class Listener extends Thread {

        private Connection conn;
        private org.postgresql.PGConnection pgconn;

        Listener(Connection conn) throws SQLException {
                this.conn = conn;
                this.pgconn = (org.postgresql.PGConnection)conn;
                Statement stmt = conn.createStatement();
                stmt.execute("LISTEN my_channel");
                stmt.close();
        }

        public void run() {
                while (true) {
                        try {
                                // issue a dummy query to contact the backend
                                // and receive any pending notifications.
                                Statement stmt = conn.createStatement();
                                ResultSet rs = stmt.executeQuery("SELECT 1");
                                rs.close();
                                stmt.close();

                                org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
                                if (notifications != null) {
                                        for (int i=0; i<notifications.length; i++) {
                                                System.out.println("Got notification: " + notifications[i].getName());
                                        }
                                }

                                // wait a while before checking again for new
                                // notifications
                                Thread.sleep(500);
                        } catch (SQLException sqle) {
                                sqle.printStackTrace();
                        } catch (InterruptedException ie) {
                                ie.printStackTrace();
                        }
                }
        }

}

class Notifier extends Thread {

        private Connection conn;

        public Notifier(Connection conn) {
                this.conn = conn;
        }

        public void run() {
                while (true) {
                        try {
                                Statement stmt = conn.createStatement();
                                stmt.execute("NOTIFY mymessage");
                                stmt.close();
                                Thread.sleep(2000);
                        } catch (SQLException sqle) {
                                sqle.printStackTrace();
                        } catch (InterruptedException ie) {
                                ie.printStackTrace();
                        }
                }
        }

}

6. FAQ

  1. 滑动窗口的数据会持久化吗?
不会。过期的数据会定期被清理掉。
  1. 滑动窗口可以保证ACID吗?
可以的,滑动窗口中的数据与原表数据使用相同的事务ID。
  1. 滑动窗口对窗口函数有效吗?
暂不支持窗口函数。
  1. 滑动窗口是否支持partition表?
支持。
  1. 滑动窗口支持多表监控吗?
不支持。
  1. 一张表可以定义多个滑动窗口吗?
可以。
  1. 同一个滑动窗口的消息,可以被多个客户端监听吗?
可以。