高级查询

除了基本的连接、聚合、分组操作外,YMatrix 还提供了许多高级分析函数,如窗口函数、CTE(Comman Table Expression)、 有序集聚合函数(Ordered-Set Aggregate Functions)、常用时序函数等。本节将向你介绍 YMatrix 中常用的高级查询方法。

下面以统计磁盘使用量为例,演示如何在 YMatrix 中进行高级查询。在示例指标表 disk 中,我们设计了磁盘的读、写速度字段,使用 MARS2 存储引擎。
MARS2 表依赖 matrixts 扩展,在建表前,首先需要你在使用该存储引擎的数据库中创建扩展。此扩展为数据库级别,无需重复创建。

CREATE EXTENSION matrixts;

创建指标表 disk。

CREATE TABLE disk (
    time timestamp with time zone,
    tag_id int,
    read float,
    write float
)
USING mars2
DISTRIBUTED BY (tag_id);

创建MARS2表成功后,你必须额外创建一个 mars2_btree 类型的索引,这样才能进行正常的数据读写。

CREATE INDEX idx_mars2 ON disk 
USING mars2_btree(tag_id);

更多 MARS2 相关信息请见 存储引擎

1 窗口函数

窗口函数主要用于处理相对复杂的报表统计分析场景。“窗口”限定了一个数据集合,在与当前行相关的行之间执行聚合查询。窗口函数与其他 SQL 函数的区别在于 OVER 子句的存在。 如果一个函数有一个 OVER 子句,那么它就是一个窗口函数。你可以通过以下表格理解窗口函数与普通聚合函数的区别。

普通聚合函数 窗口函数
输出 一条记录 多条记录
函数 max()、min()、count()、sum()等 avg()、sum()、rank()等
用法 通常与 GROUP BY 子句组合使用 与 OVER 子句组合使用。一般情况下,OVER 子句直接写在窗口函数的名称和参数之后。OVER 子句中通常可以使用 PARTITION BY、ORDER BY、ROWS BETWEEN 三种子句。只使用 PARTITION BY 子句会形成静态窗口,窗口大小、位置不会发生变化;除了 PARTITION BY 语句外,还使用了 ORDER BY、ROWS BETWEEN 语句中的一种或两种,会形成滑动窗口,即窗口大小、位置会不断变化

你可以根据以下案例实践常用的窗口函数。

1.1 累积和

通过嵌套使用 SUM 方法,可以计算累积和。

如下 SQL 计算了 2021-04-10 21:00:00 到 2021-04-10 21:00:10 间 tag_id 为 1 的磁盘读写累积和:

ymatrix=# SELECT time,
    sum(sum(read)) OVER (ORDER BY time) AS read,
    sum(sum(write)) OVER (ORDER BY time) AS write
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:00:10'::timestamp
    AND tag_id = 1 
    GROUP BY time
    ORDER BY time;

          time          |  read  |       write
------------------------+--------+--------------------
 2021-04-10 21:00:00+08 |  81.07 |               73.3
 2021-04-10 21:00:01+08 | 110.63 |             121.77
 2021-04-10 21:00:02+08 | 202.12 |             201.36
 2021-04-10 21:00:03+08 | 263.74 |             257.88
 2021-04-10 21:00:04+08 |  361.6 |              299.3
 2021-04-10 21:00:05+08 | 394.49 | 327.33000000000004
 2021-04-10 21:00:06+08 |  438.3 |             334.98
 2021-04-10 21:00:07+08 | 523.35 |             431.39
 2021-04-10 21:00:08+08 | 583.15 |             461.84
 2021-04-10 21:00:09+08 | 609.01 |             533.03
 2021-04-10 21:00:10+08 | 669.52 |              535.9
(11 rows)

1.2 移动平均值

移动平均值用来计算该条记录与前 n 条的平均值。

如下 SQL 计算了 tag_id 为 1 的磁盘,在 2021-4-10 21:00:00 到 21:01:00 每 10 秒的平均读写(对于前 9 条数据,只是计算了满足条件行数的平均值):

ymatrix=# SELECT time,
    round(AVG(read) OVER(ORDER BY time ROWS BETWEEN 9 PRECEDING AND CURRENT ROW)) AS read,
    round(AVG(write) OVER(ORDER BY time ROWS BETWEEN 9 PRECEDING AND CURRENT ROW)) AS write
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
    AND tag_id = 1
    ORDER BY time DESC;

          time          | read | write
------------------------+------+-------
 2021-04-10 21:01:00+08 |   57 |    57
 2021-04-10 21:00:59+08 |   49 |    60
 2021-04-10 21:00:58+08 |   52 |    56
 2021-04-10 21:00:57+08 |   51 |    57
 2021-04-10 21:00:56+08 |   53 |    65
 2021-04-10 21:00:55+08 |   48 |    64
 2021-04-10 21:00:54+08 |   49 |    64
 2021-04-10 21:00:53+08 |   47 |    54
 2021-04-10 21:00:52+08 |   44 |    54
 2021-04-10 21:00:51+08 |   41 |    56
......

1.3 增量

增量通常用来计算对于一个单调序列增幅或降幅,也可以简单的用来计算与前一条数据的变化。

如下语句计算了 tag_id 为 1 的磁盘,在 2021-4-10 21:00:00 到 21:01:00 期间磁盘读的变化值,正数为相比上一秒增长,负数为相比上一秒下降:

ymatrix=# SELECT
    time,
    (
     CASE WHEN lag(read) OVER (ORDER BY time) IS NULL THEN NULL
     ELSE round(read - lag(read) OVER (ORDER BY time))
     END
    ) AS read
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
    AND tag_id = 1
    ORDER BY time;

          time          | read
------------------------+------
 2021-04-10 21:00:00+08 |
 2021-04-10 21:00:01+08 |  -52
 2021-04-10 21:00:02+08 |   62
 2021-04-10 21:00:03+08 |  -30
 2021-04-10 21:00:04+08 |   36
 2021-04-10 21:00:05+08 |  -65
 2021-04-10 21:00:06+08 |   11
 2021-04-10 21:00:07+08 |   41
 2021-04-10 21:00:08+08 |  -25
 2021-04-10 21:00:09+08 |  -34
......

1.4 增速

在增量的基础上,再除以时间间隔,就可以得到增速(因为样例数据是 1 秒采样一次,所以看到的和增量的结果相同):

ymatrix=# SELECT
    time,
    (
     CASE WHEN lag(read) OVER (ORDER BY time) IS NULL THEN NULL
     ELSE round(read - lag(read) OVER (ORDER BY time))
     END
    ) / extract(epoch from time - lag(time) OVER (ORDER BY time)) AS read_rate,
    extract(epoch from time - lag(time) OVER (ORDER BY time)) AS "time lag"
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
    AND tag_id = 1
    ORDER BY time;

          time          | read_rate | time lag
------------------------+-----------+----------
 2021-04-10 21:00:00+08 |           |
 2021-04-10 21:00:01+08 |       -52 |        1
 2021-04-10 21:00:02+08 |        62 |        1
 2021-04-10 21:00:03+08 |       -30 |        1
 2021-04-10 21:00:04+08 |        36 |        1
 2021-04-10 21:00:05+08 |       -65 |        1
 2021-04-10 21:00:06+08 |        11 |        1
 2021-04-10 21:00:07+08 |        41 |        1
 2021-04-10 21:00:08+08 |       -25 |        1
 2021-04-10 21:00:09+08 |       -34 |        1
......

1.5 变化点

变化点列出相比于前一条有变化的记录,该类型查询适合在比较平稳的数据集中找发生变化的点:

ymatrix=# SELECT time, read FROM (
        SELECT time,
            read,
            read - lag(read) OVER (ORDER BY TIME) AS diff
        FROM disk
        WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
        AND tag_id = 1 ) ht
    WHERE diff IS NULL OR diff != 0
    ORDER BY time;

          time          | read
------------------------+-------
 2021-04-10 21:00:00+08 | 81.07
 2021-04-10 21:00:01+08 | 29.56
 2021-04-10 21:00:02+08 | 91.49
 2021-04-10 21:00:03+08 | 61.62
 2021-04-10 21:00:04+08 | 97.86
 2021-04-10 21:00:05+08 | 32.89
 2021-04-10 21:00:06+08 | 43.81
 2021-04-10 21:00:07+08 | 85.05
 2021-04-10 21:00:08+08 |  59.8
 2021-04-10 21:00:09+08 | 25.86
(10 rows)

2 有序集聚合函数(Ordered-Set Aggregate Functions)

2.1 计算连续百分率

你可以使用以下有序集聚合函数来计算连续百分率。

percentile_cont(fractions) WITHIN GROUP (ORDER BY sort_expression)

示例及返回结果如下。

ymatrix=# SELECT tag_id,
    percentile_cont(0.2) WITHIN GROUP
     (ORDER BY read) AS read,
    percentile_cont(0.3) WITHIN GROUP (ORDER BY write) AS write
    FROM disk
    GROUP BY tag_id
    ORDER BY tag_id;

 tag_id | read  | write
--------+-------+-------
      1 | 19.87 | 29.86
      2 | 19.95 | 29.88
      3 | 20.06 | 29.93
(3 rows)

参数表示百分比,如果传 0.5 相当于计算中位数。

3. 通用表表达式(Comman Table Expression)

通用表表达式即 CTE,你可以使用它定义构建一个临时的视图,从而使大型查询语句简化。它通过 WITH 关键字实现。在使用之前,可通过以下表格理解 CTE 与 CREATE VIEW 语句的区别。

CTE CREATE VIEW
表述 并非独立SQL语句,而是语句的一部分,即表达式 独立SQL语句
应用范围 建立的临时视图只用于所属查询语句 建立的视图可用于所有查询语句

如下 CTE 计算了所有设备中读速度平均值的最大值和最小值:

WITH avg_read (tag_id, avg_read) AS (
    SELECT tag_id, AVG(read) AS read FROM disk GROUP BY tag_id
) SELECT MAX(avg_read), MIN(avg_read) FROM avg_read;

4. 时序函数

YMatrix 提供的时序组件 matrixts 中还提供了时序场景中经常使用的时序函数,首先要创建该组件:

stats=# CREATE EXTENSION matrixts;

4.1 time_bucket

time_bucket 可以计算出给定时间段的平均值。

参数名 说明 数据类型 可缺省
period 时间窗口大小 int16;int32;int64;interval
timestamp 需要被转换的列 int16;int32;int64;date;timestamp;timestamptz

如下 SQL 计算了 tag_id 为 1 的磁盘,在 2021-4-10 21:00:00 到 22:00:00 之间每 5 分钟的平均读写速度:

ymatrix=# SELECT time_bucket('5 minutes', time) AS five_min,
    AVG(read) as read,
    AVG(write) as write
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 22:00:00'::timestamp
    AND tag_id = 1
    GROUP BY five_min
    ORDER BY five_min;

        five_min        |        read         |        write
------------------------+--------------------+--------------------
 2021-04-10 21:00:00+08 | 48.614599999999996 |  49.48656666666666
 2021-04-10 21:05:00+08 |  50.73533333333335 | 49.992566666666654
 2021-04-10 21:10:00+08 |   51.6102333333333 |  49.99359999999999
 2021-04-10 21:15:00+08 |  49.29116666666669 |  53.89146666666666
 2021-04-10 21:20:00+08 |  49.67863333333332 |  50.47406666666665
 2021-04-10 21:25:00+08 |  51.09013333333332 | 47.766733333333335
 2021-04-10 21:30:00+08 |  49.55949999999999 | 50.440766666666654
 2021-04-10 21:35:00+08 |  48.86253333333333 |  50.57290000000001
 2021-04-10 21:40:00+08 | 51.061299999999974 | 47.028766666666684
 2021-04-10 21:45:00+08 |  52.10353333333333 | 49.861466666666665
 2021-04-10 21:50:00+08 | 51.780566666666694 |            51.4159
 2021-04-10 21:55:00+08 |  51.83549999999998 | 49.124366666666674
 2021-04-10 22:00:00+08 |              93.96 |              91.07
(13 rows)

4.2 time_bucket_gapfill

当时间段中有数据缺失,需要进行数据清洗,可以使用 time_bucket_gapfill 函数为缺失数据做填充,使数据按指定时间间隔在时序上平滑分布,从而便于分析。有两种填充策略:locf (last observation carried forward) 和 interpolate。

  • locf:用聚合组中之前出现的值填充
  • interpolate:对缺失的值做线性插值填充

假设示例 disk 表中数据情况如下(只适用于 4.2 节):

ymatrix=# SELECT * FROM disk ORDER BY tag_id;
          time          | tag_id | read | write
------------------------+--------+------+-------
 2021-04-10 21:00:00+08 |      1 |  3.4 |   4.6
 2021-04-10 21:50:00+08 |      1 |    4 |   2.7
 2021-04-10 21:40:00+08 |      1 |  8.4 |    12
 2021-04-10 21:20:00+08 |      1 |  2.9 |     6
 2021-04-10 21:30:00+08 |      1 |    9 |  10.2
 2021-04-10 21:10:00+08 |      1 |  5.2 |   6.6
 2021-04-10 22:00:00+08 |      1 |   10 |     7
(7 rows)

使用 time_bucket_gapfill 函数对 tag_id 为 1 的磁盘, 2021-4-10 21:00:00 到 22:00:00 之间每隔 5 分钟进行数据填充,得到结果。

SELECT time_bucket_gapfill('5 minutes', time) AS five_min,
    locf(AVG(read)) as locf_read,
    interpolate(AVG(read)) as interpolate_read
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 22:00:00'::timestamp
    AND tag_id = 1
    GROUP BY five_min
    ORDER BY five_min;

        five_min        | locf_read | interpolate_read
------------------------+-----------+------------------
 2021-04-10 21:00:00+08 |       3.4 |              3.4
 2021-04-10 21:05:00+08 |       3.4 |              4.3
 2021-04-10 21:10:00+08 |       5.2 |              5.2
 2021-04-10 21:15:00+08 |       5.2 |             4.05
 2021-04-10 21:20:00+08 |       2.9 |              2.9
 2021-04-10 21:25:00+08 |       2.9 |             5.95
 2021-04-10 21:30:00+08 |         9 |                9
 2021-04-10 21:35:00+08 |         9 |              8.7
 2021-04-10 21:40:00+08 |       8.4 |              8.4
 2021-04-10 21:45:00+08 |       8.4 |              6.2
 2021-04-10 21:50:00+08 |         4 |                4
 2021-04-10 21:55:00+08 |         4 |                7
 2021-04-10 22:00:00+08 |        10 |               10
(13 rows)  

4.3 first/last

first 返回时间最早的值:

ymatrix=# SELECT tag_id,
    first(read, time) AS read,
    first(write, time) AS write
    FROM disk
    GROUP BY tag_id
    ORDER BY tag_id;

 tag_id | read  | write
--------+-------+-------
      1 | 11.51 | 86.61
      2 | 50.07 |  25.9
      3 | 83.72 |  10.5
(3 rows)

last 返回时间最晚的值:

ymatrix=# SELECT tag_id,
    last(read, time) AS read,
    last(write, time) AS write
    FROM disk
    GROUP BY tag_id
    ORDER BY tag_id;

 tag_id | read  | write
--------+-------+-------
      1 |  5.32 |  4.96
      2 |  5.73 | 34.73
      3 | 49.03 | 86.02
(3 rows)

4.4 last_not_null_value

last_not_null_value 相当于在 last 基础上增加了 not null 的过滤,返回最后一个非空的值:

ymatrix=# SELECT last_not_null_value(read, time)
    FROM disk WHERE tag_id = 1;
 last_not_null_value
---------------------
                 3.1
(1 row)

4.5 last_not_null

last_not_null 和 last_not_null_value 相比,不仅返回值,时间也会返回。返回的类型是字符串,格式为 '["value", "time"]':

ymatrix=# SELECT last_not_null(read, time)
    FROM disk WHERE tag_id = 1;
              last_not_null
-----------------------------------------
 ["3.1","2021-11-05 17:32:51.754457+08"]
(1 row)