一个常见的时序场景

YMatrix 适用于各种规模设备的物联网时序场景。本教程以一个具体的车联网时序场景为例,说明在 YMatrix 中加载、处理和查询时序数据的方法。

1 时序数据集介绍 -- 某城市出租车行程数据集

某城市拥有超过 800 万人口和 20 万辆出租车,市政管理部门采集并公开了出租车每次的行程信息,包括上下车时间、上下车地点、乘车人数、费用和付费方式等。借助这些信息,可以分析出什么呢?答案是:出租车的利用率,甚至整体交通状况!借此,你可以改善城市管理,并提高居民和游客的生活体验。本教程提供了一个月的数据压缩包,点击这里开启你的城市交通管理之旅(提取码 1x4u)。

2 元数据和表模式

你采集到的数据中,有一项标明付费方式,可能的付费方式有现金、信用卡、免付费、有争议、未知和无效,我们称之为静态属性。通过创建一张 payment_types 表来保存这些信息,以便后续查询时可以和这些元数据关联。与“付费方式”相关联的属性信息规模小且存在更新需求,因此指定默认存储引擎 HEAP 即可。一般情况下,无需特别指定,也将默认加载 HEAP 引擎。

CREATE TABLE IF NOT EXISTS payment_types (
    payment_type int,
    description text
)
USING heap;

使用 IF NOT EXISTS 语句,可以避免重复创建相同名称的表时报错。

INSERT INTO payment_types VALUES
(1, '信用卡'),
(2, '现金'),
(3, '免付费'),
(4, '有争议'),
(5, '未知'),
(6, '无效行程');

还有一项标明费率类型,包括标准费率、1 号机场、2 号机场、特殊区域、协商价、多人乘车等。同样,你可以创建一张静态属性表 rate_codes 来记录这些信息,同样搭载默认的 HEAP 存储引擎:

CREATE TABLE IF NOT EXISTS rate_codes (
    rate_code   int,
    description text
)
USING heap;

INSERT INTO rate_codes VALUES
(1, '标准费率'),
(2, '1号机场'),
(3, '2号机场'),
(4, '特殊区域'),
(5, '协商价'),
(6, '团体');

接下来你就可以创建一张时序数据表来保存具体的行程数据。有一些字段含义略作说明,pickup_datetime/dropoff_datetime 分别表示上车时间点和下车时间点,pickup_longitude /pickup_latitude 表示上车地点的经纬度值,dropoff_longitude/dropoff_latitude 表示下车地点的经纬度值,passenger_count 表示乘客数量,trip_distance 是旅程的距离(单位为英里),total_amount 表示乘车费用,最后一个字段 trip_duration 是一个在数据加载时生成的字段,记录了乘车时长(单位为分钟)。

在搭乘出租车的时序场景下,trip 表中的元数据为因设备随时间变化而产生的时序数据。由于时序场景下数据源具有多样性和多变性,对于时序数据写入和存储提出了较高要求,而更新与删除数据的需求则较低。因此你完全可以使用 YMatrix 中的 MARS 系列存储引擎。MARS2 为最佳选择,其时序数据写入、存储、查询性能优化显著。

MARS2 表依赖 matrixts 扩展,所以在建表前你需要首先在使用该存储引擎的库中创建扩展,如果已经创建过,无需重复创建,跳过此步即可:

CREATE EXTENSION matrixts;

建表时应使用 USING mars2 语句来指定存储引擎,用 WITH 语句设置相应参数。其中 compresstype 表示压缩算法,支持 zstd、zlib、lz4,默认值为 lz4;compresslevel 表示压缩级别。值越小压缩速度越快;值越大压缩比越高;值适中,压缩速度与压缩比相对平衡。不同的算法有效值范围都不同: zstd:1-19 zlib:1-9 lz4:1-20。默认值为 1。

CREATE TABLE IF NOT EXISTS trip (
  vendor_id text,
  pickup_datetime timestamp without time zone,
  dropoff_datetime timestamp without time zone,
  passenger_count int,
  trip_distance numeric,
  pickup_longitude numeric,
  pickup_latitude numeric,
  rate_code_id int,
  store_and_fwd_flag text,
  dropoff_longitude numeric,
  dropoff_latitude numeric,
  payment_type int,
  fare_amount numeric,
  extra numeric,
  mta_tax numeric,
  tip_amount numeric,
  tolls_amount numeric,
  improvement_surcharge numeric,
  total_amount numeric,
  trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
) 
USING mars2
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
   END (date '2016-02-01') EXCLUSIVE
   EVERY (INTERVAL '1 day') );

DISTRIBUTED BY 语句的存在意味着你将对 trip 表中的数据进行数据分桶,按照 vendor_id 列实现哈希分布,使得相同值的数据在同一个节点上。PARTITION BY 语句则是数据分区,从 2016 年 1 月 1 日起(包含本日),到 2016 年 2 月 1 日前(不包含本日),以一天为一次间隔创建 31 个分区表。 trip 表自动按天分区,方便按时间段的查询进行快速裁剪,也有利于以后针对过期数据的快速处理。

注意!
SQL 语法规定须先指定 DISTRIBUTED BY,再指定 PARTITION BY,实际执行则是先做 DISTRIBUTED BY 将数据分发到对应的节点上,然后再指定的节点上执行 PARTITION BY 将数据插入到对应的子分区表中。

MARS2 表在创建后必须额外创建一个 mars2_btree 类型的索引,这样才能进行正常的数据读写。索引主要有几个作用:

1、索引本身的功能,用于快速的查找。
2、指定 MARS2 内部动态排序的排序键。
更多 MARS 相关信息请见 存储引擎

CREATE INDEX idx_trip ON trip USING mars2_btree (vendor_id, pickup_datetime);

3 数据加载

你可以找到从上述链接下载的 yellow_tripdata_2016-01.csv 文件路径,然后使用 mxgate 命令加载数据。在 tail 后指定实际的文件路径,通过 --db-master-host 参数指定实际的节点名或者 IP,例如:

 tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256  --delimiter ','  --exclude-columns trip_duration 

mxgate 主要参数说明如下:

--db-database postgres     指定数据库名    
--db-master-host mdw  主节点主机名或者IP
--db-master-port 5432  数据库端口
--db-user mxadmin         数据库用户名
--time-format raw      原始格式,不进行转换
--target trip          要导入的表名
--parallel 256         并行数
--delimiter ','        分隔符

更多 mxgate 相关信息请见 MatrixGate

4 数据分析

YMatrix 提供了 time_bucket 函数,支持按照任意时间区间的分段计算。使用前,同样需要在数据库上安装 MatrixTS 扩展来初始化时序组件,无需重复创建。

 CREATE EXTENSION matrixts;

然后你就可以通过下面的 SQL 语句统计出每天总共有多少行程了:

SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*) 
from trip 
GROUP BY day 
ORDER BY day;

如果你想要了解 2016 年 1 月 2 号一天中每个小时分别有多少人乘车,可以用下面的 SQL:

SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;

通过 max 和 min 函数可以快速得知,在当前的数据集中,最远的行程距离是 485.9 英里。如果想要进一步了解在 10、10-50、50-100、100-200、200 以上不同行程距离区段的总行程数,也只需要一条 SQL 语句完成:

SELECT distance_range, count(*) AS num_of_trips  
FROM
(
SELECT 
CASE
    WHEN trip_distance <= 10 THEN 10
    WHEN trip_distance >  10 AND trip_distance <= 50 THEN 50
    WHEN trip_distance >  50 AND trip_distance <= 100 THEN 100 
    WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
    WHEN trip_distance > 200 THEN 500 
    END AS distance_range  
FROM trip
) AS temp
GROUP BY distance_range;

执行后看到这样的输出,就大功告成了:

 distance_range | num_of_trips
----------------+--------------
             10 |     10308767
             50 |       586200
            100 |          379
            200 |           58
            500 |            9