一个常见的时序场景

YMatrix 适用于各种规模设备的物联网时序场景。本教程以一个具体的车联网时序场景为例,说明在 YMatrix 中加载、处理和查询时序数据的方法。

1 时序数据集介绍 -- 某城市出租车行程数据集

某城市拥有超过 800 万人口和 20 万辆出租车,市政管理部门采集并公开了出租车每次的行程信息,包括上下车时间、上下车地点、乘车人数、费用和付费方式等。借助这些信息,可以分析出什么呢?答案是:出租车的利用率,甚至整体交通状况!借此,你可以改善城市管理,并提高居民和游客的生活体验。本教程提供了一个月的数据压缩包,点击这里开启你的城市交通管理之旅(提取码 1x4u)。

2 元数据和表模式

你采集到的数据中,有一项标明付费方式,可能的付费方式有现金、信用卡、免付费、有争议、未知和无效,我们称之为静态属性。通过创建一张 payment_types 表来保存这些信息,以便后续查询时可以和这些元数据关联。与“付费方式”相关联的属性信息规模小且存在更新需求,因此指定默认存储引擎 HEAP 即可。一般情况下,无需特别指定,也将默认加载 HEAP 引擎。

=# CREATE TABLE IF NOT EXISTS payment_types (
    payment_type int,
    description text
)
USING HEAP;

使用 IF NOT EXISTS 语句,可以避免重复创建相同名称的表时报错。

=# INSERT INTO payment_types VALUES
(1, '信用卡'),
(2, '现金'),
(3, '免付费'),
(4, '有争议'),
(5, '未知'),
(6, '无效行程');

还有一项标明费率类型,包括标准费率、1 号机场、2 号机场、特殊区域、协商价、多人乘车等。同样,你可以创建一张静态属性表 rate_codes 来记录这些信息,同样搭载默认的 HEAP 存储引擎:

=# CREATE TABLE IF NOT EXISTS rate_codes (
    rate_code   int,
    description text
)
USING HEAP;

=# INSERT INTO rate_codes VALUES
(1, '标准费率'),
(2, '1号机场'),
(3, '2号机场'),
(4, '特殊区域'),
(5, '协商价'),
(6, '团体');

接下来你就可以创建一张时序数据表来保存具体的行程数据。有一些字段含义略作说明,pickup_datetime / dropoff_datetime 分别表示上车时间点和下车时间点,pickup_longitude / pickup_latitude 表示上车地点的经纬度值,dropoff_longitude / dropoff_latitude 表示下车地点的经纬度值,passenger_count 表示乘客数量,trip_distance 是旅程的距离(单位为英里),total_amount 表示乘车费用,最后一个字段 trip_duration 是一个在数据加载时生成的字段,记录了乘车时长(单位为分钟)。

在搭乘出租车的时序场景下,trip 表中的元数据为因设备随时间变化而产生的时序数据。由于时序场景下数据源具有多样性和多变性,对于时序数据写入和存储提出了较高要求,而更新与删除数据的需求则较低。因此 MARS3 为最佳选择,其时序数据写入、存储、查询性能优化显著。

MARS3 表依赖 matrixts 扩展,所以在建表前你需要首先在使用该存储引擎的库中创建扩展,如果已经创建过,无需重复创建,跳过此步即可:

=# CREATE EXTENSION matrixts;

建表时应使用 USING MARS3 语句来指定存储引擎,用 WITH 语句设置相应参数。其中 compresstype 表示压缩算法,支持 zstdzliblz4,默认值为 lz4compresslevel 表示压缩级别。值越小压缩速度越快;值越大压缩比越高;值适中,压缩速度与压缩比相对平衡。不同的算法有效值范围都不同:

  • zstd:1-19
  • zlib:1-9
  • lz4:1-20。默认值为 1。
    =# CREATE TABLE IF NOT EXISTS trip (
    vendor_id text,
    pickup_datetime timestamp without time zone,
    dropoff_datetime timestamp without time zone,
    passenger_count int,
    trip_distance numeric,
    pickup_longitude numeric,
    pickup_latitude numeric,
    rate_code_id int,
    store_and_fwd_flag text,
    dropoff_longitude numeric,
    dropoff_latitude numeric,
    payment_type int,
    fare_amount numeric,
    extra numeric,
    mta_tax numeric,
    tip_amount numeric,
    tolls_amount numeric,
    improvement_surcharge numeric,
    total_amount numeric,
    trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
    ) 
    USING MARS3
    WITH (compresstype='lz4', compresslevel=1)
    DISTRIBUTED BY (vendor_id)
    ORDER BY (vendor_id, pickup_datetime)
    PARTITION BY RANGE (pickup_datetime)
    ( START (date '2016-01-01') INCLUSIVE
     END (date '2016-02-01') EXCLUSIVE
     EVERY (INTERVAL '1 day') );
  • DISTRIBUTED BY 语句的存在意味着你将对 trip 表中的数据进行数据分桶,按照 vendor_id 列实现哈希分布,使得相同值的数据在同一个节点上。
  • ORDER BY 语句使得每个节点上的全部数据按照 (vendor_id, pickup_datetime) 排序键进行排序,使得数据有序存储。
  • PARTITION BY 语句则是数据分区,从 2016 年 1 月 1 日起(包含本日),到 2016 年 2 月 1 日前(不包含本日),以一天为一次间隔创建 31 个分区表。 trip 表自动按天分区,方便按时间段的查询进行快速裁剪,也有利于以后针对过期数据的快速处理。

注意!
SQL 语法规定须先指定 DISTRIBUTED BY,再指定 PARTITION BY,实际执行则是先做 DISTRIBUTED BY 将数据分发到对应的节点上,然后再指定的节点上执行 PARTITION BY 将数据插入到对应的子分区表中。

3 数据加载

你可以找到从上述链接下载的 yellow_tripdata_2016-01.csv 文件路径,然后使用 mxgate 命令加载数据。在 tail 后指定实际的文件路径,通过 --db-master-host 参数指定实际的节点名或者 IP,例如:

$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256  --delimiter ','  --exclude-columns trip_duration 

mxgate 主要参数说明如下:

--db-database postgres     // 指定数据库名    
--db-master-host mdw       // 主节点主机名或者IP
--db-master-port 5432      // 数据库端口
--db-user mxadmin          // 数据库用户名
--time-format raw          // 原始格式,不进行转换
--target trip              // 要导入的表名
--parallel 256             // 并行数
--delimiter ','            // 分隔符

更多 mxgate 相关信息请见 MatrixGate

4 数据分析

YMatrix 提供了 time_bucket 函数,支持按照任意时间区间的分段计算。使用前,同样需要在数据库上安装 matrixts 扩展来初始化时序组件,无需重复创建。

=# CREATE EXTENSION matrixts;

然后你就可以通过下面的 SQL 语句统计出每天总共有多少行程了:

=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*) 
from trip 
GROUP BY day 
ORDER BY day;

如果你想要了解 2016 年 1 月 2 号一天中每个小时分别有多少人乘车,可以用下面的 SQL:

=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;

通过 maxmin 函数可以快速得知,在当前的数据集中,最远的行程距离是 485.9 英里。如果想要进一步了解在 10、10-50、50-100、100-200、200 以上不同行程距离区段的总行程数,也只需要一条 SQL 语句完成:

=# SELECT distance_range, count(*) AS num_of_trips  
FROM
(
SELECT 
CASE
    WHEN trip_distance <= 10 THEN 10
    WHEN trip_distance >  10 AND trip_distance <= 50 THEN 50
    WHEN trip_distance >  50 AND trip_distance <= 100 THEN 100 
    WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
    WHEN trip_distance > 200 THEN 500 
    END AS distance_range  
FROM trip
) AS temp
GROUP BY distance_range;

执行后看到这样的输出,就大功告成了:

 distance_range | num_of_trips
----------------+--------------
             10 |     10308767
             50 |       586200
            100 |          379
            200 |           58
            500 |            9