一个常见的时序场景
YMatrix 适用于各种规模设备的物联网时序场景。本教程以一个具体的车联网时序场景为例,说明在 YMatrix 中加载、处理和查询时序数据的方法。
1 时序数据集介绍 -- 某城市出租车行程数据集
某城市拥有超过 800 万人口和 20 万辆出租车,市政管理部门采集并公开了出租车每次的行程信息,包括上下车时间、上下车地点、乘车人数、费用和付费方式等。借助这些信息,可以分析出什么呢?答案是:出租车的利用率,甚至整体交通状况!借此,你可以改善城市管理,并提高居民和游客的生活体验。本教程提供了一个月的数据压缩包,点击这里开启你的城市交通管理之旅(提取码 1x4u)。
2 元数据和表模式
你采集到的数据中,有一项标明付费方式,可能的付费方式有现金、信用卡、免付费、有争议、未知和无效,我们称之为静态属性。通过创建一张 payment_types
表来保存这些信息,以便后续查询时可以和这些元数据关联。与“付费方式”相关联的属性信息规模小且存在更新需求,因此指定默认存储引擎 HEAP 即可。一般情况下,无需特别指定,也将默认加载 HEAP 引擎。
=# CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
使用 IF NOT EXISTS
语句,可以避免重复创建相同名称的表时报错。
=# INSERT INTO payment_types VALUES
(1, '信用卡'),
(2, '现金'),
(3, '免付费'),
(4, '有争议'),
(5, '未知'),
(6, '无效行程');
还有一项标明费率类型,包括标准费率、1 号机场、2 号机场、特殊区域、协商价、多人乘车等。同样,你可以创建一张静态属性表 rate_codes
来记录这些信息,同样搭载默认的 HEAP 存储引擎:
=# CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
=# INSERT INTO rate_codes VALUES
(1, '标准费率'),
(2, '1号机场'),
(3, '2号机场'),
(4, '特殊区域'),
(5, '协商价'),
(6, '团体');
接下来你就可以创建一张时序数据表来保存具体的行程数据。有一些字段含义略作说明,pickup_datetime
/ dropoff_datetime
分别表示上车时间点和下车时间点,pickup_longitude
/ pickup_latitude
表示上车地点的经纬度值,dropoff_longitude
/ dropoff_latitude
表示下车地点的经纬度值,passenger_count
表示乘客数量,trip_distance
是旅程的距离(单位为英里),total_amount
表示乘车费用,最后一个字段 trip_duration
是一个在数据加载时生成的字段,记录了乘车时长(单位为分钟)。
在搭乘出租车的时序场景下,trip
表中的元数据为因设备随时间变化而产生的时序数据。由于时序场景下数据源具有多样性和多变性,对于时序数据写入和存储提出了较高要求,而更新与删除数据的需求则较低。因此 MARS3 为最佳选择,其时序数据写入、存储、查询性能优化显著。
MARS3 表依赖 matrixts
扩展,所以在建表前你需要首先在使用该存储引擎的库中创建扩展,如果已经创建过,无需重复创建,跳过此步即可:
=# CREATE EXTENSION matrixts;
建表时应使用 USING MARS3
语句来指定存储引擎,用 WITH
语句设置相应参数。其中 compresstype
表示压缩算法,支持 zstd
、zlib
、lz4
,默认值为 lz4
;compresslevel
表示压缩级别。值越小压缩速度越快;值越大压缩比越高;值适中,压缩速度与压缩比相对平衡。不同的算法有效值范围都不同:
- zstd:1-19
- zlib:1-9
- lz4:1-20。默认值为 1。
=# CREATE TABLE IF NOT EXISTS trip ( vendor_id text, pickup_datetime timestamp without time zone, dropoff_datetime timestamp without time zone, passenger_count int, trip_distance numeric, pickup_longitude numeric, pickup_latitude numeric, rate_code_id int, store_and_fwd_flag text, dropoff_longitude numeric, dropoff_latitude numeric, payment_type int, fare_amount numeric, extra numeric, mta_tax numeric, tip_amount numeric, tolls_amount numeric, improvement_surcharge numeric, total_amount numeric, trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED ) USING MARS3 WITH (compresstype='lz4', compresslevel=1) DISTRIBUTED BY (vendor_id) ORDER BY (vendor_id, pickup_datetime) PARTITION BY RANGE (pickup_datetime) ( START (date '2016-01-01') INCLUSIVE END (date '2016-02-01') EXCLUSIVE EVERY (INTERVAL '1 day') );
DISTRIBUTED BY
语句的存在意味着你将对trip
表中的数据进行数据分桶,按照vendor_id
列实现哈希分布,使得相同值的数据在同一个节点上。ORDER BY
语句使得每个节点上的全部数据按照(vendor_id, pickup_datetime)
排序键进行排序,使得数据有序存储。PARTITION BY
语句则是数据分区,从 2016 年 1 月 1 日起(包含本日),到 2016 年 2 月 1 日前(不包含本日),以一天为一次间隔创建 31 个分区表。trip
表自动按天分区,方便按时间段的查询进行快速裁剪,也有利于以后针对过期数据的快速处理。
注意!
SQL 语法规定须先指定DISTRIBUTED BY
,再指定PARTITION BY
,实际执行则是先做DISTRIBUTED BY
将数据分发到对应的节点上,然后再指定的节点上执行PARTITION BY
将数据插入到对应的子分区表中。
3 数据加载
你可以找到从上述链接下载的 yellow_tripdata_2016-01.csv
文件路径,然后使用 mxgate
命令加载数据。在 tail
后指定实际的文件路径,通过 --db-master-host
参数指定实际的节点名或者 IP,例如:
$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
mxgate 主要参数说明如下:
--db-database postgres // 指定数据库名
--db-master-host mdw // 主节点主机名或者IP
--db-master-port 5432 // 数据库端口
--db-user mxadmin // 数据库用户名
--time-format raw // 原始格式,不进行转换
--target trip // 要导入的表名
--parallel 256 // 并行数
--delimiter ',' // 分隔符
更多 mxgate 相关信息请见 MatrixGate
4 数据分析
YMatrix 提供了 time_bucket
函数,支持按照任意时间区间的分段计算。使用前,同样需要在数据库上安装 matrixts
扩展来初始化时序组件,无需重复创建。
=# CREATE EXTENSION matrixts;
然后你就可以通过下面的 SQL 语句统计出每天总共有多少行程了:
=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
如果你想要了解 2016 年 1 月 2 号一天中每个小时分别有多少人乘车,可以用下面的 SQL:
=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
通过 max
和 min
函数可以快速得知,在当前的数据集中,最远的行程距离是 485.9 英里。如果想要进一步了解在 10、10-50、50-100、100-200、200 以上不同行程距离区段的总行程数,也只需要一条 SQL 语句完成:
=# SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
执行后看到这样的输出,就大功告成了:
distance_range | num_of_trips
----------------+--------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9