滑动窗口
1. 背景介绍
滑动窗口是流计算场景的常见功能。在流计算数据场景中,可以对最近一段时间内的数据持续地做聚集运算。 滑动窗口通常搭配监控告警使用,当最近一段时间内数据满足预设条件时,数据服务端会向客户端发送告警消息。 例如,每台设备,每分钟计算一次平均温度,超过 90 度,则告警。
2. 什么是滑动窗口?
滑动窗口(Sliding Window),不同于滚动窗口的不重叠,滑动窗口的时间窗口是可以重叠的。 滑动窗口有 WINDOW_SIZE 和 SLIDING_SIZE 两个参数,WINDOW_SIZE 指定了窗口的展示范围,SLIDING_SIZE 为每次滑动的步长。
3. 时间窗口如何滑动?
滑动窗口的时间,选用 Insert Time,即数据入库的时间。 通过设置 SLIDING_SIZE 决定窗口的滑动步长。
-
当
SLIDING_SIZE
<WINDOW_SIZE
,则展示的窗口会重叠,每条数据会被多个时间窗口观察到。 -
当
SLIDING_SIZE
=WINDOW_SIZE
,则等同于滚筒窗口,没有重叠,每条数据只会展示一次。 -
当
SLIDING_SIZE
>WINDOW_SIZE
,则为跳跃窗口,窗口之间没有重叠且有间隙,部分数据可能不会被展示。
4. 滑动窗口如何做监控告警?
窗口每次滑动时,数据库监控当前窗口内的数据,如果窗口内的数据满足预设条件时,则推送消息。
推送通过 PostgreSQL 提供的 pg_notify
发送。
客户端用 libpq 协议提供的 listener
接口接收。(例如 go 语言 libpq.NewListener; Java 语言 java.sql.Listener)
在创建滑动窗口时,通过参数 CHANNEL_NAME
声明消息队列的名称。
同一个消息队列,可以被多个客户端同时监听
多个滑动窗口也可以复用同一个消息队列,也就是说,同一个客户端可以同时监听多个滑动窗口的告警消息。
5. 使用方法
5.1 创建扩展
滑动窗口依赖 matrixts 扩展,首先创建扩展:
CREATE EXTENSION matrixts;
5.2 创建数据表
CREATE TABLE metrics(
time timestamp,
tag_id int,
sensor float8
)
DISTRIBUTED BY (tag_id);
5.3 创建滑动窗口
CREATE VIEW sv1 WITH (
CONTINUOUS,
WINDOW_SIZE='10 min',
SLIDING_SIZE='1 min',
CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;
5.4 参数说明:
参数分两部分,OPTION的参数和滑动窗口及消息推送相关。而SQL指定了流数据如何在滑动窗口中展示。
OPTION:
CONTINUOUS
声明该视图为聚集滑动窗口。WINDOW_SIZE
滑动窗口监控数据的时间窗口大小。SLIDING_SIZE
滑动窗口每次滑动的时间大小。CHANNEL_NAME
当监控条件满足时,数据库推送的消息队列名。
SQL:
SELECT
每次推送的数据内容。(可以使用postgres内置json函数,组装消息格式。)FROM
窗口数据来自哪张单表。(滑动窗口不支持多表数据)WHERE
原数据表中,哪部分数据可以被窗口监控。HAVING
监控推送的触发条件。如果省略HAVING
条件,则窗口每次滑动都会推送消息。
5.5 示例
示例1:最高温度监控
统计每台设备过去2分钟的最高温度,每30秒更新一次。(即2分钟的时间窗口,30秒滑动一次)
消息格式为(设备号,最高温度)。
消息推送至消息队列 temp_monitor
。
首先,创建设备数据表 metrics_1。
CREATE TABLE metrics_1(
time timestamp,
tag_id int,
temp float8
)
DISTRIBUTED BY (tag_id);
然后创建滑动窗口。
CREATE VIEW temp_sv WITH (
CONTINUOUS,
WINDOW_SIZE='2 min',
SLIDING_SIZE='30 seconds',
CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;
示例2:最低电压告警
统计每台设备过去1分钟的电压值,每10秒更新一次。
当最低电压超过10时触发告警。
消息格式为 (设备号,平均电压,最低电压,最高电压)
。
消息推送至消息队列 over_volt_alarm
。
首先,创建设备数据表 metrics_2
。
CREATE TABLE metrics_2(
time timestamp,
tag_id int,
volt float8
)
DISTRIBUTED BY (tag_id);
然后创建滑动窗口。
CREATE VIEW volt_alarm WITH (
CONTINUOUS,
WINDOW_SIZE='1 min',
SLIDING_SIZE='10 seconds',
CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;
客户端示例1: go语言
ref: https://pkg.go.dev/github.com/lib/pq/example/listen
package main
import (
"database/sql"
"fmt"
"time"
"github.com/lib/pq"
)
func waitForNotification(l *pq.Listener) {
select {
case n := <-l.Notify:
fmt.Println("received notification, new work available")
fmt.Println(n.Channel)
fmt.Println(n.Extra)
case <-time.After(90 * time.Second):
go l.Ping()
// Check if there's more work available, just in case it takes
// a while for the Listener to notice connection loss and
// reconnect.
fmt.Println("received no work for 90 seconds, checking for new work")
}
}
func main() {
var conninfo string = ""
db, err := sql.Open("postgres", conninfo)
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
minReconn := 10 * time.Second
maxReconn := time.Minute
fmt.Println("entering conn")
listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
var err = listener.Listen("my_channel")
if err != nil {
panic(err)
}
fmt.Println("entering main loop")
for {
// process all available work before waiting for notifications
// getWork(db)
waitForNotification(listener)
}
}
客户端示例2: java语言
ref: https://jdbc.postgresql.org/documentation/81/listennotify.html
import java.sql.*;
public class NotificationTest {
public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://localhost:5432/test";
// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"test","");
Connection nConn = DriverManager.getConnection(url,"test","");
// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
// Notifier notifier = new Notifier(nConn);
listener.start();
// notifier.start();
}
}
class Listener extends Thread {
private Connection conn;
private org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN my_channel");
stmt.close();
}
public void run() {
while (true) {
try {
// issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
if (notifications != null) {
for (int i=0; i<notifications.length; i++) {
System.out.println("Got notification: " + notifications[i].getName());
}
}
// wait a while before checking again for new
// notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
class Notifier extends Thread {
private Connection conn;
public Notifier(Connection conn) {
this.conn = conn;
}
public void run() {
while (true) {
try {
Statement stmt = conn.createStatement();
stmt.execute("NOTIFY mymessage");
stmt.close();
Thread.sleep(2000);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
6. FAQ
- 滑动窗口的数据会持久化吗?
不会。过期的数据会定期被清理掉。
- 滑动窗口可以保证ACID吗?
可以的,滑动窗口中的数据与原表数据使用相同的事务ID。
- 滑动窗口对窗口函数有效吗?
暂不支持窗口函数。
- 滑动窗口是否支持partition表?
支持。
- 滑动窗口支持多表监控吗?
不支持。
- 一张表可以定义多个滑动窗口吗?
可以。
- 同一个滑动窗口的消息,可以被多个客户端监听吗?
可以。