一个常见的时序场景
YMatrix 适用于各种规模设备的物联网时序场景。本教程以一个具体的车联网时序场景为例,说明在 YMatrix 中加载、处理和查询时序数据的方法。
1 时序数据集介绍 -- 某城市出租车行程数据集
某城市拥有超过 800 万人口和 20 万辆出租车,市政管理部门采集并公开了出租车每次的行程信息,包括上下车时间、上下车地点、乘车人数、费用和付费方式等。借助这些信息,可以分析出什么呢?答案是:出租车的利用率,甚至整体交通状况!借此,你可以改善城市管理,并提高居民和游客的生活体验。本教程提供了一个月的数据压缩包,点击这里开启你的城市交通管理之旅(提取码 1x4u)。
2 元数据和表模式
你采集到的数据中,有一项标明付费方式,可能的付费方式有现金、信用卡、免付费、有争议、未知和无效,我们称之为静态属性。通过创建一张 payment_types 表来保存这些信息,以便后续查询时可以和这些元数据关联。与“付费方式”相关联的属性信息规模小且存在更新需求,因此指定默认存储引擎 HEAP 即可。一般情况下,无需特别指定,也将默认加载 HEAP 引擎。
CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
使用 IF NOT EXISTS 语句,可以避免重复创建相同名称的表时报错。
INSERT INTO payment_types VALUES
(1, '信用卡'),
(2, '现金'),
(3, '免付费'),
(4, '有争议'),
(5, '未知'),
(6, '无效行程');
还有一项标明费率类型,包括标准费率、1 号机场、2 号机场、特殊区域、协商价、多人乘车等。同样,你可以创建一张静态属性表 rate_codes 来记录这些信息,同样搭载默认的 HEAP 存储引擎:
CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
INSERT INTO rate_codes VALUES
(1, '标准费率'),
(2, '1号机场'),
(3, '2号机场'),
(4, '特殊区域'),
(5, '协商价'),
(6, '团体');
接下来你就可以创建一张时序数据表来保存具体的行程数据。有一些字段含义略作说明,pickup_datetime/dropoff_datetime 分别表示上车时间点和下车时间点,pickup_longitude /pickup_latitude 表示上车地点的经纬度值,dropoff_longitude/dropoff_latitude 表示下车地点的经纬度值,passenger_count 表示乘客数量,trip_distance 是旅程的距离(单位为英里),total_amount 表示乘车费用,最后一个字段 trip_duration 是一个在数据加载时生成的字段,记录了乘车时长(单位为分钟)。
在搭乘出租车的时序场景下,trip 表中的元数据为因设备随时间变化而产生的时序数据。由于时序场景下数据源具有多样性和多变性,对于时序数据写入和存储提出了较高要求,而更新与删除数据的需求则较低。因此 MARS2 为最佳选择,其时序数据写入、存储、查询性能优化显著。
MARS2 表依赖 matrixts
扩展,所以在建表前你需要首先在使用该存储引擎的库中创建扩展,如果已经创建过,无需重复创建,跳过此步即可:
CREATE EXTENSION matrixts;
建表时应使用 USING MARS2 语句来指定存储引擎,用 WITH
语句设置相应参数。其中 compresstype
表示压缩算法,支持 zstd
、zlib
、lz4
,默认值为 lz4
;compresslevel
表示压缩级别。值越小压缩速度越快;值越大压缩比越高;值适中,压缩速度与压缩比相对平衡。不同的算法有效值范围都不同:
- zstd:1-19
- zlib:1-9
- lz4:1-20。默认值为 1。
CREATE TABLE IF NOT EXISTS trip ( vendor_id text, pickup_datetime timestamp without time zone, dropoff_datetime timestamp without time zone, passenger_count int, trip_distance numeric, pickup_longitude numeric, pickup_latitude numeric, rate_code_id int, store_and_fwd_flag text, dropoff_longitude numeric, dropoff_latitude numeric, payment_type int, fare_amount numeric, extra numeric, mta_tax numeric, tip_amount numeric, tolls_amount numeric, improvement_surcharge numeric, total_amount numeric, trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED ) USING MARS2 WITH (compresstype='lz4', compresslevel=1) DISTRIBUTED BY (vendor_id) PARTITION BY RANGE (pickup_datetime) ( START (date '2016-01-01') INCLUSIVE END (date '2016-02-01') EXCLUSIVE EVERY (INTERVAL '1 day') );
DISTRIBUTED BY 语句的存在意味着你将对 trip 表中的数据进行数据分桶,按照 vendor_id 列实现哈希分布,使得相同值的数据在同一个节点上。PARTITION BY 语句则是数据分区,从 2016 年 1 月 1 日起(包含本日),到 2016 年 2 月 1 日前(不包含本日),以一天为一次间隔创建 31 个分区表。 trip 表自动按天分区,方便按时间段的查询进行快速裁剪,也有利于以后针对过期数据的快速处理。
注意!
SQL 语法规定须先指定 DISTRIBUTED BY,再指定 PARTITION BY,实际执行则是先做 DISTRIBUTED BY 将数据分发到对应的节点上,然后再指定的节点上执行 PARTITION BY 将数据插入到对应的子分区表中。
MARS2 表在创建后必须额外创建一个 mars2_btree
类型的索引,这样才能进行正常的数据读写。索引主要有几个作用:
1、索引本身的功能,用于快速的查找。
2、指定 MARS2 内部动态排序的排序键。
更多 MARS2 相关信息请见 存储引擎
CREATE INDEX idx_trip ON trip USING mars2_btree (vendor_id, pickup_datetime);
3 数据加载
你可以找到从上述链接下载的 yellow_tripdata_2016-01.csv 文件路径,然后使用 mxgate 命令加载数据。在 tail 后指定实际的文件路径,通过 --db-master-host 参数指定实际的节点名或者 IP,例如:
tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
mxgate 主要参数说明如下:
--db-database postgres 指定数据库名
--db-master-host mdw 主节点主机名或者IP
--db-master-port 5432 数据库端口
--db-user mxadmin 数据库用户名
--time-format raw 原始格式,不进行转换
--target trip 要导入的表名
--parallel 256 并行数
--delimiter ',' 分隔符
更多 mxgate 相关信息请见 MatrixGate
4 数据分析
YMatrix 提供了 time_bucket 函数,支持按照任意时间区间的分段计算。使用前,同样需要在数据库上安装 MatrixTS 扩展来初始化时序组件,无需重复创建。
CREATE EXTENSION matrixts;
然后你就可以通过下面的 SQL 语句统计出每天总共有多少行程了:
SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
如果你想要了解 2016 年 1 月 2 号一天中每个小时分别有多少人乘车,可以用下面的 SQL:
SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
通过 max 和 min 函数可以快速得知,在当前的数据集中,最远的行程距离是 485.9 英里。如果想要进一步了解在 10、10-50、50-100、100-200、200 以上不同行程距离区段的总行程数,也只需要一条 SQL 语句完成:
SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
执行后看到这样的输出,就大功告成了:
distance_range | num_of_trips
----------------+--------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9