400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
400-800-0824
info@ymatrix.cn
YMatrix 文档
关于 YMatrix
标准集群部署
数据写入
数据迁移
数据查询
运维监控
参考指南
工具指南
数据类型
存储引擎
执行引擎
流计算引擎
灾难恢复
系统配置参数
索引
扩展
SQL 参考
常见问题(FAQ)
YMatrix 适用于各种规模设备的物联网时序场景。本教程以一个具体的车联网时序场景为例,说明在 YMatrix 中加载、处理和查询时序数据的方法。
某城市拥有超过 800 万人口和 20 万辆出租车,市政管理部门采集并公开了出租车每次的行程信息,包括上下车时间、上下车地点、乘车人数、费用和付费方式等。借助这些信息,可以分析出什么呢?答案是:出租车的利用率,甚至整体交通状况!借此,你可以改善城市管理,并提高居民和游客的生活体验。本教程提供了一个月的数据压缩包,点击这里开启你的城市交通管理之旅(提取码 1x4u)。
你采集到的数据中,有一项标明付费方式,可能的付费方式有现金、信用卡、免付费、有争议、未知和无效,我们称之为静态属性。通过创建一张 payment_types
表来保存这些信息,以便后续查询时可以和这些元数据关联。与“付费方式”相关联的属性信息规模小且存在更新需求,因此指定默认存储引擎 HEAP 即可。一般情况下,无需特别指定,也将默认加载 HEAP 引擎。
=# CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
使用 IF NOT EXISTS
语句,可以避免重复创建相同名称的表时报错。
=# INSERT INTO payment_types VALUES
(1, '信用卡'),
(2, '现金'),
(3, '免付费'),
(4, '有争议'),
(5, '未知'),
(6, '无效行程');
还有一项标明费率类型,包括标准费率、1 号机场、2 号机场、特殊区域、协商价、多人乘车等。同样,你可以创建一张静态属性表 rate_codes
来记录这些信息,同样搭载默认的 HEAP 存储引擎:
=# CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
=# INSERT INTO rate_codes VALUES
(1, '标准费率'),
(2, '1号机场'),
(3, '2号机场'),
(4, '特殊区域'),
(5, '协商价'),
(6, '团体');
接下来你就可以创建一张时序数据表来保存具体的行程数据。有一些字段含义略作说明,pickup_datetime
/ dropoff_datetime
分别表示上车时间点和下车时间点,pickup_longitude
/ pickup_latitude
表示上车地点的经纬度值,dropoff_longitude
/ dropoff_latitude
表示下车地点的经纬度值,passenger_count
表示乘客数量,trip_distance
是旅程的距离(单位为英里),total_amount
表示乘车费用,最后一个字段 trip_duration
是一个在数据加载时生成的字段,记录了乘车时长(单位为分钟)。
在搭乘出租车的时序场景下,trip
表中的元数据为因设备随时间变化而产生的时序数据。由于时序场景下数据源具有多样性和多变性,对于时序数据写入和存储提出了较高要求,而更新与删除数据的需求则较低。因此 MARS3 为最佳选择,其时序数据写入、存储、查询性能优化显著。
MARS3 表依赖 matrixts
扩展,所以在建表前你需要首先在使用该存储引擎的库中创建扩展,如果已经创建过,无需重复创建,跳过此步即可:
=# CREATE EXTENSION matrixts;
建表时应使用 USING MARS3
语句来指定存储引擎,用 WITH
语句设置相应参数。其中 compresstype
表示压缩算法,支持 zstd
、zlib
、lz4
,默认值为 lz4
;compresslevel
表示压缩级别。值越小压缩速度越快;值越大压缩比越高;值适中,压缩速度与压缩比相对平衡。不同的算法有效值范围都不同:
=# CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
DISTRIBUTED BY
语句的存在意味着你将对 trip
表中的数据进行数据分桶,按照 vendor_id
列实现哈希分布,使得相同值的数据在同一个节点上。ORDER BY
语句使得每个节点上的全部数据按照 (vendor_id, pickup_datetime)
排序键进行排序,使得数据有序存储。PARTITION BY
语句则是数据分区,从 2016 年 1 月 1 日起(包含本日),到 2016 年 2 月 1 日前(不包含本日),以一天为一次间隔创建 31 个分区表。 trip
表自动按天分区,方便按时间段的查询进行快速裁剪,也有利于以后针对过期数据的快速处理。 注意!
SQL 语法规定须先指定DISTRIBUTED BY
,再指定PARTITION BY
,实际执行则是先做DISTRIBUTED BY
将数据分发到对应的节点上,然后再指定的节点上执行PARTITION BY
将数据插入到对应的子分区表中。
你可以找到从上述链接下载的 yellow_tripdata_2016-01.csv
文件路径,然后使用 mxgate
命令加载数据。在 tail
后指定实际的文件路径,通过 --db-master-host
参数指定实际的节点名或者 IP,例如:
$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
mxgate 主要参数说明如下:
--db-database postgres // 指定数据库名
--db-master-host mdw // 主节点主机名或者IP
--db-master-port 5432 // 数据库端口
--db-user mxadmin // 数据库用户名
--time-format raw // 原始格式,不进行转换
--target trip // 要导入的表名
--parallel 256 // 并行数
--delimiter ',' // 分隔符
更多 mxgate 相关信息请见 MatrixGate
YMatrix 提供了 time_bucket
函数,支持按照任意时间区间的分段计算。使用前,同样需要在数据库上安装 matrixts
扩展来初始化时序组件,无需重复创建。
=# CREATE EXTENSION matrixts;
然后你就可以通过下面的 SQL 语句统计出每天总共有多少行程了:
=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
如果你想要了解 2016 年 1 月 2 号一天中每个小时分别有多少人乘车,可以用下面的 SQL:
=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
通过 max
和 min
函数可以快速得知,在当前的数据集中,最远的行程距离是 485.9 英里。如果想要进一步了解在 10、10-50、50-100、100-200、200 以上不同行程距离区段的总行程数,也只需要一条 SQL 语句完成:
=# SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
执行后看到这样的输出,就大功告成了:
distance_range | num_of_trips
----------------+--------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9