时序基本教程

MatrixDB适用于各种规模设备的物联网时序应用场景。本教程以一个具体的数据集为例,说明在MatrixDB中加载、处理和查询时序数据的方法。

1 数据集介绍 -- 某城市出租车行程数据集

某城市拥有超过800万人口和20万辆出租车,市政管理部门采集并公开了出租车每次的行程信息,包括上下车时间、上下车地点、乘车人数、费用和付费方式等,借助这些信息,可以分析出租车的利用率和整体交通状况,改进城市管理,改善居民和游客的生活体验。本教程提供了一个月的数据压缩包,可以在这里下载(提取码1x4u)。

2 元数据和表模式

在采集的数据中,有一项标明付费方式,可能的付费方式有现金、信用卡、免付费、有争议、未知和无效。通过创建一张payment_types表来保存这些信息,以便后续查询时可以和这些元数据关联。

create table if not exists payment_types (
    payment_type int,
    description text
);

insert into payment_types values
(1, '信用卡'),
(2, '现金'),
(3, '免付费'),
(4, '有争议'),
(5, '未知'),
(6, '无效行程');

还有一项标明费率类型,包括标准费率、1号机场、2号机场、特殊区域、协商价、多人乘车等。可以创建一张rate_codes表来记录这些类型:

create table if not exists rate_codes (
    rate_code   int,
    description text
);

insert into rate_codes values
(1, '标准费率'),
(2, '1号机场'),
(3, '2号机场'),
(4, '特殊区域'),
(5, '协商价'),
(6, '团体');

接下来就可以创建一张数据库表来保存具体的行程数据。有一些字段含义略作说明,pickup_datetime/dropoff_datetime分别表示上车时间点和下车时间点,pickup_longitude /pickup_latitude表示上车地点的经纬度值,dropoff_longitude/dropoff_latitude表示下车地点的经纬度值,passenger_count表示乘客数量,trip_distance是旅程的距离(单位为英里),total_amount表示乘车费用,最后一个字段trip_duration是一个在数据加载时生成的字段,记录了乘车时长(单位为分钟)。

create table if not exists trip (
  vendor_id text,
  pickup_datetime timestamp without time zone,
  dropoff_datetime timestamp without time zone,
  passenger_count int,
  trip_distance numeric,
  pickup_longitude numeric,
  pickup_latitude numeric,
  rate_code_id int,
  store_and_fwd_flag text,
  dropoff_longitude numeric,
  dropoff_latitude numeric,
  payment_type int,
  fare_amount numeric,
  extra numeric,
  mta_tax numeric,
  tip_amount numeric,
  tolls_amount numeric,
  improvement_surcharge numeric,
  total_amount numeric,
  trip_duration numeric generated always as (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
) 
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
   END (date '2016-02-01') EXCLUSIVE
   EVERY (INTERVAL '1 day') );

trip表会自动按天分区,方便按时间段的查询进行快速裁剪,也有利于以后针对过期数据的快速处理。

3 数据加载

找到从上述链接下载的数据文件包解压后的yellow_tripdata_2016-01.csv文件路径,使用mxgate命令加载数据。请在tail后指定实际的文件路径,通过--db-master-host参数指定实际的节点名或者IP,例如:

 tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database mxdb --db-master-host sdw3 --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256  --delimiter ','  --exclude-columns trip_duration 

mxgate主要参数说明如下:

--db-database mxdb     指定数据库名    
--db-master-host sdw3  主节点主机名或者IP
--db-master-port 5432  数据库端口
--db-user mxdb         数据库用户名
--time-format raw      原始格式,不进行转换
--target trip          要导入的表名
--parallel 256         并行数
--delimiter ','        分隔符

mxgate原理与更详细的使用方法请参考MatrixGate

4 数据处理

和许多传感器数据类似,因为各种复杂原因,该数据集中包含一些明显错误的或者无效的数据。借助MatrixDB提供的丰富SQL能力,可以快速检测并清除这些无效错误数据。 一种错误情况是下车时间早于或者等于上车时间,可以使用SELECT查看这种情况并用DELETE语句删除:

select count(*) 
from trip 
where dropoff_datetime <= pickup_datetime;

delete from trip 
where dropoff_datetime <= pickup_datetime;

还有一种情况是汽车的平均速度大于每小时300英里,尤其在旅程或者时间较长时,这种情况明显不合理,可以使用如下的语句查看并清除:

select trip_distance, trip_duration 
from trip 
where trip_distance > trip_duration* (300/60) and trip_distance > 100;

delete from trip 
where trip_distance > trip_duration* (300/60) and trip_distance > 100;

5 数据分析

MatrixDB提供了time_bucket函数,支持按照任意时间区间的分段计算。使用前,需要在数据库上安装MatrixTS Extension来初始化时序组件:

 CREATE EXTENSION matrixts;

完成后,可以通过下面的SQL语句统计出每天有多少行程:

select time_bucket('24 hours', pickup_datetime) as day, count(*) 
from trip 
group by day 
order by day;

如果想要了解2016年1月2号一天中每个小时,分别有多少人乘车,可以用下面的SQL:

SELECT time_bucket('1 hour', pickup_datetime) as hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' and pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;

通过max和min函数可以快速得知,在当前的数据集中,最远的行程距离是485.9英里。如果想要进一步了解在10、10-50、50-100、100-200、200以上不同行程距离区段的总行程数,也只需要一条SQL语句完成:

select distance_range, count(*) as num_of_trips  
from
(
select 
case
    when trip_distance <= 10 then 10
    when trip_distance > 10 AND trip_distance <= 50 then 50
    when trip_distance > 50 AND trip_distance <= 100 then 100 
    when trip_distance > 100 AND trip_distance <= 200 then 200
    when trip_distance > 200 then 500 
    end as distance_range  
from trip
) as temp
group by distance_range;

执行后可以看到这样的输出:

 distance_range | num_of_trips
----------------+--------------
             10 |     10308767
             50 |       586200
            100 |          379
            200 |           58
            500 |            9