物联网时序场景下的典型查询

本文档给出时序场景下的典型查询语句示例。由于多数场景的查询在实际应用过程中都具有融合性,因此本文并非严格分类。

注意!
了解一个场景的查询需求,是构建准确数据模型的关键之一。


1 单设备基本查询

1.1 单设备点查

查询单个设备某个指标或多个指标在某个时间点的值,最常见的查询是某个设备某些指标的最新值或非空最新值,如某辆车行驶中的实时车速:

=# SELECT timestamp, car_id, value FROM car
    WHERE car_id = '2' 
    ORDER BY timestamp DESC
    LIMIT 1;

1.2 单设备明细查询

查询某个设备在某个时间区间的明细数据,可能涉及单个指标也可能是多个指标。通常为进一步的详细分析,如售后服务中的故障原因分析:

=# SELECT * FROM analysis
    WHERE device_id = '2' 
    AND timestamp >= '2023-10-08 00:00:00' 
    AND timestamp <= '2023-10-08 09:00:00';

1.3 单设备聚集查询

查询某个设备的某些指标在某个时间区间的聚集值,譬如最大值、最小值、累加和、平均值等:

=# SELECT count(1),sum(c1),avg(c1),max(c2),min(c2) FROM metrics
    WHERE device_id = '1'
    AND timestamp >= '2023-10-08 00:00:00' 
    AND timestamp <= '2023-10-08 09:00:00';


2 多设备基本查询

2.1 多设备点查

查询多个设备在某个时间点的指标值,可能涉及单个指标也可能是多个指标,如组合信号查询。

=# SELECT timestamp, device_id, value FROM metrics
    WHERE device_id = '2' OR device_id = '3'
    AND timestamp = '2023-10-08 00:00:00'
    ORDER BY device_id, timestamp DESC;

提示 该查询将返回满足条件的数据,首先按照 device_id 列升序排序,如果有多个相同 device_id 的记录,则将它们再按照 timestamp 列降序排序。如果你想要两列均按降序返回,则需将 SQL 修改为 ORDER BY device_id DESC, timestamp DESC

2.2 多设备明细查询

查询多个设备在某个时间区间的明细数据,可能涉及单个指标也可能是多个指标,多用于数据现象发现。如根因分析、新品追踪等。一般面向研发人员:

=# SELECT d.device_id, d.timestamp, d.value, s.device_name, l.location_name
FROM device_data AS d
JOIN device_status AS s ON d.device_id = s.device_id
JOIN device_location AS l ON d.device_id = l.device_id
WHERE d.device_id IN ('device1', 'device2', 'device3')
  AND d.timestamp >= '2023-11-01 00:00:00'
  AND d.timestamp <= '2023-11-10 23:59:59'
ORDER BY d.device_id, d.timestamp;

这是一个根因分析的例子,其中假设有以下几个表:

  • device_data:包含设备数据的表,具有 device_idtimestampvalue 等列
  • device_status:包含设备状态的表,具有 device_iddevice_name 等列
  • device_location:包含设备位置信息的表,具有 device_idlocation_name 等列

关键点

  • 此查询联合以上三个表来获取设备数据、设备状态和设备位置信息。
    • 查询中的 JOIN 语句用于将相关表连接起来,通过设备 ID 进行关联。
    • WHERE 子句用于筛选指定设备 ID 和时间范围内的数据。

2.3 多设备聚集查询

查询多个设备的某些指标在某个时间区间的聚集值,譬如最大值、最小值、累加和、平均值等,并按照设备分组,如各区域网络状况检查:

=# SELECT l.location_name, COUNT(d.device_id) AS total_devices, AVG(d.value) AS average_value
FROM device_data AS d
JOIN device_location AS l ON d.device_id = l.device_id
WHERE l.location_name IN ('location1', 'location2', 'location3')
  AND d.timestamp >= '2023-11-01 00:00:00'
  AND d.timestamp <= '2023-11-10 23:59:59'
GROUP BY l.location_name;

这个是一个各区域网络状况检查的例子,其中假设有以下两个表:

  • device_data:包含设备数据的表,具有 device_idtimestampvalue 等列。
  • device_location:包含设备位置信息的表,具有 device_idlocation_name 等列。

关键点

  • 以上查询联合以上两个表来获取设备数据和设备位置信息。
    • 查询中的 JOIN 语句用于将相关表连接起来,通过设备 ID 进行关联。
    • WHERE 子句用于筛选指定的区域和时间范围内的数据。
    • GROUP BY 子句用于按区域进行分组。
    • COUNTAVG 聚集函数分别用于计算设备数量和平均值。

3 高级查询


3.1 时空特定查询

first() 函数、last() 函数相关的查询等,有时还会涉及到与空间数据相结合的查询,如开始油量、结束油量等。

使用 YMatrix 的时序函数首先需要创建扩展:

=# CREATE EXTENSION matrixts;

扩展为数据库级别,一个数据库创建一次即可,无需重复创建。


3.1.1 查询最早值

返回指定列中第一个记录的值:

=# SELECT id,first(c1, ts) AS c,
    FROM t
    GROUP BY id
    ORDER BY id;


3.1.2 查询最晚值

返回指定列中最后一个记录的值:

=# SELECT device_id, last(value, timestamp) AS last_value
    FROM t
    GROUP BY device_id
    ORDER BY device_id;


3.1.3 计算给定时间段的平均值

=# SELECT time_bucket('5 s', timestamp) AS five_second, count(*) FROM t
    GROUP BY five_second;

上述 SQL 意味着 t 表每 5s 会进行一次 count(*) 查询。

关键点

  • 使用 GROUP BY 语句将原始数据按更宽广的时间间隔分组,并统计出每组数据的关键特征信息的相关查询,称之为降采样。降采样不仅可以降低存储开销,还能保留关键数据特征,便于分析数据的历史趋势,预测未来趋势等。


3.1.4 非空最新值

=# SELECT 
    device_id,
    max(timestamp) as max_timestamp,
    last_not_null_value(value1, timestamp) as lastvalue1,
    last_not_null_value(value2, timestamp) as lastvalue2,
    last_not_null_value(value3, timestamp) as lastvalue3
   FROM t
     GROUP BY device_id;

=# SELECT 
    device_id,
    max(timestamp) as max_timestamp,
    last_not_null(value1, timestamp) as lastvalue1,
    last_not_null(value2, timestamp) as lastvalue2,
    last_not_null(value3, timestamp) as lastvalue3
   FROM t
     GROUP BY device_id;

关键点

  • last_not_null()last_not_null_value() 函数。
  • last_not_null()last_not_null_value() 相比,不仅返回值,时间也会返回。返回的类型是字符串,格式为 [<value>, <time>]


3.1.5 差值计算

通过差值计算判断指标变化情况,如股票是否正常等:

=# SELECT
    device_id,
    timestamp, 
    value,
    lag(value) OVER(PARTITION BY device_id ORDER BY timestamp),
    lead(value) OVER(PARTITION BY device_id ORDER BY timestamp),
    lead(value) OVER(PARTITION BY device_id ORDER BY timestamp) - value AS lead_value 
FROM t;

关键点

  • 窗口函数 lag() 用于获取当前行的前一行值。
  • 窗口函数 lead() 用于获取当前行的后一行值。
  • 通过在 PARTITION BY 子句中指定 device_id,我们将窗口函数应用于每个设备 ID 分区内的行。
  • 使用 lead(value) - value 计算下一条记录的值与当前记录的差值,并将其命名为 lead_value


3.2 异常数据清洗

=# SELECT time_bucket_gapfill('45 minutes', timestamp) AS five_min,
          locf(AVG(value)) As locf_value,
          interpolate(AVG(value)) AS interpolate_value
   FROM t
       WHERE device_id = '1'
       AND timestamp >= '2021-12-01 00:00:00'
       AND timestamp < '2021-12-02 06:00:00'
   GROUP BY five_min
   ORDER BY five_min;

关键点

  • 上述查询使用了 time_bucket_gapfill() 函数以及后向(LOCF)和线性插值函数对 t 表中缺失值进行填充。
    • time_bucket_gapfill('45 minutes', timestamp):这个函数将时间戳 timestamp 按照指定的时间间隔(45分钟)进行桶化,并填充缺失的时间桶。这样可以确保结果中的时间桶连续且没有缺失。
    • locf(AVG(c1)):这个函数使用 LOCF 方法,对每个时间桶内的 value 值进行填充。LOCF(Last Observation Carried Forward)方法使用当前时间桶之前最后一个非缺失值来填充缺失值。
    • interpolate(AVG(c1)):这个函数使用线性插值方法,对每个时间桶内的 value 值进行填充。线性插值方法使用当前时间桶之前和之后的非缺失值之间的线性函数来估计缺失值。


3.3 异常数据监测

3.3.1 累加和、累计平均值和中位数计算

可通过累积平均值、中位数异常变化进行异常数据检测:

-- 累加和
=# SELECT device_id, timestamp, value,
          SUM(value) OVER(PARTITION BY device_id, timestamp::date ORDER BY timestamp) AS accumulation_value
   FROM t;

-- 累积平均
=# SELECT device_id, timestamp, value,
       AVG(value) OVER(PARTITION BY device_id, timestamp::date ORDER BY timestamp) AS accumulation_value
   FROM t;

-- 中位数
=# SELECT device_id,
       percentile_cont(0.5) WITHIN GROUP (ORDER BY value) AS accumulation_value
   FROM t
   GROUP BY device_id
   ORDER BY device_id;

3.3.2 增量计算

增量可以计算对于一个单调序列增幅或降幅,也可以简单的用来计算与前一条数据的变化,通常用于异常数据检测:

示例 1

该示例计算了 tag_id1 的磁盘,在 2021-4-10 21:00:0021:01:00 期间磁盘读的变化值:

=# SELECT
      time,
      (
       CASE WHEN lag(read) OVER (ORDER BY time) IS NULL THEN NULL
       ELSE round(read - lag(read) OVER (ORDER BY time))
       END
      ) AS read
   FROM disk
      WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
      AND tag_id = 1
      ORDER BY time;

关键点

  • 利用窗口函数 lag() 获取前一行的值。
  • 正数值为相比上一秒的增长量,负数值为相比上一秒的下降量。

示例 2

该示例利用 sales 表计算了每天的销售额增量:

=# SELECT
      date,
      sales_amount,
      sales_amount - LAG(sales_amount) OVER (ORDER BY date) AS sales_increment
   FROM
      sales
   ORDER BY
      date;

关键点

  • 通过将前一天的销售额减去当前天的销售额,计算出每天的销售额增量。

3.3.3 增速特征计算

示例 1

在增量的基础上,再除以时间间隔,就可以得到增速:

=# SELECT
    time,
    (
     CASE WHEN lag(read) OVER (ORDER BY time) IS NULL THEN NULL
     ELSE round(read - lag(read) OVER (ORDER BY time))
     END
    ) / extract(epoch from time - lag(time) OVER (ORDER BY time)) AS read_rate,
    extract(epoch from time - lag(time) OVER (ORDER BY time)) AS "time lag"
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
    AND tag_id = 1
    ORDER BY time;

关键点

  • 增量/时间间隔=增速


3.4 指标跳变差值查询

此查询可计算出每次跳变前后的时间、设备号、跳变前后的指标值,如查询股票指标流水,回溯股票交易状态等:

=# SELECT stock_id, timestamp, value, lag_value, lead_value, lag_diff_value, lead_diff_value FROM (
    SELECT stock_id, timestamp, value,
        lag(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lag_value,  --value 列的上一条数值
        lead(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lead_value,  --value 列的下一条数值
        value - lag(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lag_diff_value,
        value - lead(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lead_diff_value
    FROM t
       WHERE 1=1
       AND stock_id in ('1')
       AND timestamp >= '2021-12-01 00:00:00'
       AND timestamp < '2021-12-02 23:59:59'::timestamp
       AND value IS NOT NULL
   ) ht 
   WHERE abs(lag_diff_c1) > 3 OR abs(lead_diff_c1) > 3
   ORDER BY stock_id, timestamp;

关键点

  • 查询中的子查询使用了窗口函数 lag()lead(),在每个 stock_id 分区内按照 timestamp 列的顺序获取当前行的前一行和后一行的值。
    • lag_value 列是 value 列的前一行数值。
    • lead_value 列是 value 列的后一行数值。
    • lag_diff_value 列是当前行的 value 值与前一行的 value 值之间的差异。
    • lead_diff_value 列是当前行的 value 值与后一行的 value 值之间的差异。
  • 最外层的查询从子查询结果中选择满足差异(的绝对值)大于 3 的行。

注意!
跳变检测指的是指定设备在指定时间范围内,找到指定指标发生的跳变。跳变指相邻时刻值发生剧烈改变超过某一个阈值。


3.5 监控大屏

3.5.1 分位数函数

使用分位数函数计算监控大屏实时数据,例如用 9 分位数反映某只股票趋势:

=# SELECT stock_id,
          percentile_cont(0.9) WITHIN GROUP (ORDER BY value) AS value_9
    FROM t
    GROUP BY stock_id
    ORDER BY stock_id;

3.5.2 累计分布函数(CDF)

使用累计函数,你可以计算股票价格或指标在给定时间段内的累积分布。这有助于评估价格或指标的相对位置,以及确定股票的价格区间和统计特征。例如用于实时监控大屏上统计数据分布情况的展示。

=# SELECT stock_id, timestamp, price, 
          cume_dist() OVER(PARTITION BY stock_id, timestamp::date ORDER BY timestamp) AS cume_dist
   FROM t;

关键点

  • cume_dist() 函数可以计算累积分布值:累积分布值 = (在当前行之前或者平级的分区行数) / 分区行总数


3.6 即席分析

时序场景中使用行列转换实现数据模型转换,多用于即席分析。

3.6.1 行转列

=# SELECT name,
          max(CASE WHEN attribute='age' THEN value ELSE 0 END) AS age,
          max(CASE WHEN attribute='height' THEN value ELSE 0 END) AS height,
          max(CASE WHEN attribute='weight' THEN value ELSE 0 END) AS weight,
          max(CASE WHEN attribute='shoe_size' THEN value ELSE 0 END) AS shoe_size
   FROM t
   GROUP BY name
   ORDER BY age DESC;

t 原结构示例:

+------+----------+-------+
| name | attribute | value |
+------+----------+-------+
| John | age      | 30    |
| John | height   | 175   |
| John | weight   | 70    |
| John | shoe_size| 9.5   |
| Mary | age      | 25    |
| Mary | height   | 160   |
| Mary | weight   | 55    |
| Mary | shoe_size| 8     |
+------+----------+-------+

行转列结果示例:

+------+-----+-------+--------+-----------+
| name | age | height| weight | shoe_size |
+------+-----+-------+--------+-----------+
| John | 30  | 175   | 70     | 9.5       |
| Mary | 25  | 160   | 55     | 8         |
+------+-----+-------+--------+-----------+

关键点

  • 这种行转列的查询方式在需要将行数据转换为列数据进行比较或展示时非常有用。通过使用条件语句将不同属性值转换为对应的列,并使用聚集函数计算每个属性的最大值(或其他聚集操作),可以方便地将数据进行透视和展示。

3.6.2 列转行

=# SELECT currenttimestamp, 
          deviceid, 
          devicetemplatecode,
          statisticstype,
          (b.rec).key AS key, 
          (b.rec).value AS value 
   FROM
     (SELECT currenttimestamp, 
             deviceid, 
             devicetemplatecode,
             statisticstype,
             jsonb_each_text(row_to_json(t.*)::jsonb-'currenttimestamp'-'deviceid'-'devicetemplatecode'-'statisticstype') AS rec  
     FROM t
   ) b
   WHERE (b.rec).value IS NOT NULL;

t 原结构示例:

+---------------------+----------+-------------------+----------------+--------+--------+--------+
| currenttimestamp    | deviceid | devicetemplatecode| statisticstype | key1   | key2   | key3   |
+---------------------+----------+-------------------+----------------+--------+--------+--------+
| 2023-11-13 08:30:45 | 1234567  | template1         | type1          | value1 | value2 | value3 |
+---------------------+----------+-------------------+----------------+--------+--------+--------+

列转行结果示例:

+---------------------+----------+-------------------+----------------+------+-------+
| currenttimestamp    | deviceid | devicetemplatecode| statisticstype | key  | value |
+---------------------+----------+-------------------+----------------+------+-------+
| 2023-11-13 08:30:45 | 123456   | template1         | type1          | key1 | value1|
| 2023-11-13 08:30:45 | 123456   | template1         | type1          | key2 | value2|
| 2023-11-13 08:30:45 | 123456   | template1         | type1          | key3 | value3|
| ...                 | ...      | ...               | ...            | ...  | ...   |
+---------------------+----------+-------------------+----------------+------+-------+

关键点

  • 通过使用 row_to_json() 函数将每行数据转换为 JSON 对象,然后使用 jsonb_each_text() 函数将 JSON 对象转换为键-值对的形式,实现了列转行的操作。


3.7 指标实时标签

显示某种机器/账户/人的状态:

=# SELECT first_name,
          last_name,
          salary,
    CASE
     WHEN salary >= 80000 THEN 'senior'
     WHEN salary >= 60000 THEN 'intermediate'
    ELSE 'junior'
    END AS employee_level
   FROM employees;

关键点

  • 使用 CASE WHEN 子句查询复合指标:
    • 如果 salary 大于等于 80000,则 employee_level 列的值将为 senior
    • 如果 salary 大于等于 60000 且小于 80000,则 employee_level 列的值将为 intermediate
    • 其他情况下,即 salary 小于 60000,则 employee_level 列的值将为 junior


3.8 峰值检测

指定股票在指定时间范围内指定指标的 TOP10 峰值,以及峰值出现的时间等:

=# SELECT  stock_id, timestamp, value FROM (
    SELECT stock_id, timestamp, value,
           row_number() OVER (PARTITION BY stock_id ORDER BY value DESC) AS rn
    FROM t_test
      WHERE stock_id ='1'
      AND timestamp >= '2021-12-01 00:00:00'
      AND timestamp < '2021-12-03 00:00:00'
   ) AS derived_table
   WHERE rn <= 10;

关键点

  • 查询语句中使用了子查询和窗口函数 row_number(),对表 t_test 进行了筛选和排序操作,并展示了 value 最高的前十行。


3.9 溯源查询

用于溯源查询、JSONB 数据类型的指标遍历等:

=# WITH RECURSIVE json_recursive AS (
    SELECT id,
           'person' AS key,
           data->'person' AS value
    FROM my_table
    UNION ALL
    SELECT j.id,
           k AS key,
           v AS value
    FROM json_recursive j,
         jsonb_each(j.value) AS kv(k, v)
    WHERE jsonb_typeof(j.value) = 'object'
   )
   SELECT id, key, value
   FROM json_recursive;

关键点

  • 查询语句中使用了递归查询 (WITH RECURSIVE) 和两个子查询 (SELECT 语句)。
    • 递归查询的起始部分是第一个子查询,它选取了表 my_table 中的 id 列、固定的键名 person 和对应的值作为初始结果集。
    • 递归查询的迭代部分是第二个子查询,它通过连接 json_recursive 表与 jsonb_each() 函数的结果进行迭代。在每次迭代中,将当前的 JSON 值 (j.value) 使用 jsonb_each() 函数拆解成键-值对,并将拆解后的键 (k) 和值 (v) 作为迭代结果。同时,通过 WHERE 子句中的条件 jsonb_typeof(j.value) = 'object',仅选择 JSON 值类型为对象的记录,以保证递归终止条件。
  • 最终查询结果从 json_recursive 表中选择 idkeyvalue 列,以获取完整的溯源信息。


3.10 机器学习

机器学习可用于预测和分类。

例如,根据 1月 1日 至 10日的数据,计算线型回归模型,预测 11日的指标:

=# SELECT '2021-12-11 00:00:01' AS timestamp,
    extract(epoch FROM '2021-12-11 00:00:01'::timestamp) * slope + intercept AS c1_value
   FROM (
    SELECT regr_slope(value, extract(epoch from timestamp)) AS slope,
         regr_intercept(value, extract(epoch from timestamp)) AS intercept
    FROM t_test
    WHERE vin = '1'
        AND timestamp >= '2021-12-01 00:00:00'
        AND timestamp < '2021-12-11 00:00:00'
   ) AS liner_regression;

关键点

  • 该查询语句中使用了两个子查询和线性回归函数 regr_slope()regr_intercept(),对表 t_test 进行了线性回归计算。它们可以用于计算线性回归模型的斜率和截距,在 SQL 中进行简单的机器学习任务。
    • 子查询根据条件选择了 vin 值为 1timestamp 大于等于 2021-12-01 00:00:00,且小于 2021-12-11 00:00:00 的记录,并计算了 value 列与 timestamp 列的斜率和截距。
    • 然后,在外层查询中,固定了 timestamp 列的值为 2021-12-11 00:00:01,并使用斜率和截距计算了对应的 value 值,并将其命名为 c1_value