400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
YMatrix 文档
关于 YMatrix
标准集群部署
数据写入
数据迁移
数据查询
运维监控
参考指南
工具指南
数据类型
存储引擎
执行引擎
流计算引擎
灾难恢复
系统配置参数
索引
扩展
SQL 参考
常见问题(FAQ)
本文档介绍了 YMatrix 的滑动窗口特性。
滑动窗口是流计算场景的常见功能。在流计算数据场景中,可以对最近一段时间内的数据持续地做聚集运算。 滑动窗口通常搭配监控告警使用,当最近一段时间内数据满足预设条件时,数据服务端会向客户端发送告警消息。 例如,每台设备,每分钟计算一次平均温度,超过 90 度,则告警。
滑动窗口(Sliding Window),不同于滚动窗口的不重叠,滑动窗口的时间窗口是可以重叠的。
滑动窗口有 WINDOW_SIZE
和 SLIDING_SIZE
两个参数,WINDOW_SIZE
指定了窗口的展示范围,SLIDING_SIZE
为每次滑动的步长。
滑动窗口的时间,选用 Insert Time,即数据入库的时间。
通过设置 SLIDING_SIZE
决定窗口的滑动步长。
当 SLIDING_SIZE
< WINDOW_SIZE
,则展示的窗口会重叠,每条数据会被多个时间窗口观察到。
当 SLIDING_SIZE
= WINDOW_SIZE
,则等同于滚筒窗口,没有重叠,每条数据只会展示一次。
当 SLIDING_SIZE
> WINDOW_SIZE
,则为跳跃窗口,窗口之间没有重叠且有间隙,部分数据可能不会被展示。
窗口每次滑动时,数据库监控当前窗口内的数据,如果窗口内的数据满足预设条件时,则推送消息。
推送通过 PostgreSQL 提供的 pg_notify
发送。
客户端用 libpq 协议提供的 listener
接口接收。(例如 go 语言 libpq.NewListener; Java 语言 java.sql.Listener)
在创建滑动窗口时,通过参数 CHANNEL_NAME
声明消息队列的名称。
同一个消息队列,可以被多个客户端同时监听
多个滑动窗口也可以复用同一个消息队列,也就是说,同一个客户端可以同时监听多个滑动窗口的告警消息。
滑动窗口依赖 matrixts 扩展,首先创建扩展:
=# CREATE EXTENSION matrixts;
=# CREATE TABLE metrics(
time timestamp,
tag_id int,
sensor float8
)
USING MARS3
DISTRIBUTED BY (tag_id)
ORDER BY (time,tag_id);
=# CREATE VIEW sv1 WITH (
CONTINUOUS,
WINDOW_SIZE='10 min',
SLIDING_SIZE='1 min',
CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;
参数分两部分,OPTION的参数和滑动窗口及消息推送相关。而SQL指定了流数据如何在滑动窗口中展示。
CONTINUOUS
声明该视图为聚集滑动窗口。WINDOW_SIZE
滑动窗口监控数据的时间窗口大小。SLIDING_SIZE
滑动窗口每次滑动的时间大小。CHANNEL_NAME
当监控条件满足时,数据库推送的消息队列名。SELECT
每次推送的数据内容。(可以使用postgres内置json函数,组装消息格式。)FROM
窗口数据来自哪张单表。(滑动窗口不支持多表数据)WHERE
原数据表中,哪部分数据可以被窗口监控。HAVING
监控推送的触发条件。如果省略 HAVING
条件,则窗口每次滑动都会推送消息。统计每台设备过去2分钟的最高温度,每30秒更新一次。(即2分钟的时间窗口,30秒滑动一次)
消息格式为(设备号,最高温度)。
消息推送至消息队列 temp_monitor
。
首先,创建设备数据表 metrics_1。
=# CREATE TABLE metrics_1(
time timestamp,
tag_id int,
temp float8
)
USING MARS3
DISTRIBUTED BY (tag_id)
ORDER BY (time,tag_id);
然后创建滑动窗口。
=# CREATE VIEW temp_sv WITH (
CONTINUOUS,
WINDOW_SIZE='2 min',
SLIDING_SIZE='30 seconds',
CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;
统计每台设备过去1分钟的电压值,每10秒更新一次。
当最低电压超过10时触发告警。
消息格式为 (设备号,平均电压,最低电压,最高电压)
。
消息推送至消息队列 over_volt_alarm
。
首先,创建设备数据表 metrics_2
。
=# CREATE TABLE metrics_2(
time timestamp,
tag_id int,
volt float8
)
USING MARS3
DISTRIBUTED BY (tag_id)
ORDER BY (time,tag_id);
然后创建滑动窗口。
=# CREATE VIEW volt_alarm WITH (
CONTINUOUS,
WINDOW_SIZE='1 min',
SLIDING_SIZE='10 seconds',
CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;
ref: https://pkg.go.dev/github.com/lib/pq/example/listen
package main
import (
"database/sql"
"fmt"
"time"
"github.com/lib/pq"
)
func waitForNotification(l *pq.Listener) {
select {
case n := <-l.Notify:
fmt.Println("received notification, new work available")
fmt.Println(n.Channel)
fmt.Println(n.Extra)
case <-time.After(90 * time.Second):
go l.Ping()
// Check if there's more work available, just in case it takes
// a while for the Listener to notice connection loss and
// reconnect.
fmt.Println("received no work for 90 seconds, checking for new work")
}
}
func main() {
var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"
_, _ = sql.Open("postgres", conninfo)
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
minReconn := 10 * time.Second
maxReconn := time.Minute
fmt.Println("entering conn")
listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
var err = listener.Listen("my_channel")
if err != nil {
panic(err)
}
fmt.Println("entering main loop")
for {
// process all available work before waiting for notifications
//getWork(db)
waitForNotification(listener)
}
}
ref: https://jdbc.postgresql.org/documentation/81/listennotify.html
package test;
import java.sql.*;
public class NotificationTest {
public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://172.16.100.32:5432/postgres";
// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
// Notifier notifier = new Notifier(nConn);
listener.start();
// notifier.start();
}
}
class Listener extends Thread {
private Connection conn;
private org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN my_channel");
stmt.close();
}
public void run() {
while (true) {
try {
// issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
System.out.println(notifications.length);
if (notifications != null) {
for (int i=0; i<notifications.length; i++) {
System.out.println("Got notification: " + notifications[i].getName());
System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
}
}
// wait a while before checking again for new
// notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
不会。过期的数据会定期被清理掉。
可以的,滑动窗口中的数据与原表数据使用相同的事务ID。
暂不支持窗口函数。
支持。
不支持。
可以。
可以。