Apache NiFi + MatrixDB 20行代码实现数据实时入库!

继上一篇“空间节省50%,时序性能提升5倍,三一重工从Hadoop+Spark到MatrixDB架构变迁实现One for ALL”发布后,这次我们再从 Apache NiFi + MatrixDB 着手,用20行代码轻松实现数据实时入库!

作者信息:李净芝 - 工程车辆事业部研究院大数据工程师

1. 前言

目前,三一重工泵诵云平台的数据接入采用 Nifi + MatrixDB 的方案已正常运行4个月,NiFi 集群由3台配置为内存32g、硬盘1T的服务器组成,平均5分钟消费 Kafka 数据6.4GB,每秒处理数据14784条。

NiFi 本身提供大量组件,用以应付各式各样 ETL 场景,实现了 hdfs、本地文件系统、主流数据库 (mysql/oracle/postgres) 之间数据的流转。 1

什么是 Apache NiFi ?

Apache NiFi,专门用于解决与数据流有关问题的工具,易于使用、功能强大、可靠的数据 ETL 系统。基于 WEB 图形页面,通过组件的拖拽、连接及配置,即可搭建完整的数据流,实时监控数据在各个处理组件之间流转的情况。

什么是 MatrixGate ?

MatrixGate 简称 mxgate,是 MatrixDB 自带的高性能流式数据加载服务器,使用 mxgate 进行数据加载性能要远远高于原生 INSERT 语句,MatrixGate 加载数据的逻辑如下图所示: 2

  1. 数据采集系统采集设备数据或者接收由设备发送来的数据
  2. 采集系统以并发微批的模式向 MatrixGate 的服务进程 mxgate 持续发送数据
  3. mxgate 进程和 MatrixDB 的 master 进程高效通信,沟通事务和控制信息
  4. 数据直接发送到 segment 节点,并行高速写入,不存在 master 单点瓶颈。

三一重工泵诵云平台将二者相结合,实现数据实时入库,且解决标准化的问题。在分享案例之前,先了解一下 NiFi 中一个重要的概念:

FlowFile

  • FlowFile 是 NiFi 的核心概念,是对原始数据记录的抽象,是面向 FBP (Flow-Based Programming) 设计的。
  • FlowFile 是数据记录,由一个指针(指向内容)和属性组成。
  • 属性是 Key / value 键值对,是 Flowfile 的元数据。
  • 内容是原始数据。

在后续的案例中,数据的流转在 NiFi 中就是 Flowfile 的生成与转化。

2. 案例

数据处理思路 3

根据上面的思路,使用 NiFi 搭建数据流的过程如下:

01.

首先,获取数据,可以用 NiFi 自带的 ConsumeKafka_2_0 组件,只需配置 broker、topic,即可消费数据输出到下一组件。 4

02.

上图中的 Sany/JsonTypeJudge 为自定义组件,功能为根据 kafka.key 分发车载泵和泵车的数据,也可以用 NiFi 自带的 RouteOnAttribute 组件。

官方组件实现的分发规则更加的灵活,但是效率要低许多。样例如下: 5 6

03.

分发的数据输出的合并组件,这里使用 NiFi 自带的 MergeContent 组件,合并策略采用桶策略。 7 8

桶策略的含义如下:

  • 每个 FlowFile 都有属性,桶策略首先需要指定合并属性,在上图中,合并属性设置为 kafka.key,也就是设备号。
  • kafka.key 为A的 FlowFile 将进入A桶,kafka.key 为B的 FlowFile 将进入B桶,以此类推,每个 FlowFile 根据自身的属性进入对应的桶。
  • 桶策略还有其他配置,比如桶中的最小/最大文件数、桶中文件的最小/最大 Size、桶的持续时间等等,一旦达到门限值,桶里面的 FlowFiles 会打包合并成一个 FlowFile 输出到下一组件。

9

04.

将合并后的 jsonArray 进行解析,在这里用到的组件为自定义组件,因为原始的 json,key 不固定,而 NiFi 自带的 jsonReader 组件只能用单一的 schema 去读。 10

05.

对分发的实时工况进行标准化。设备上传的数据是由控制协议定义的,随着控制协议以及设备的更新,新老设备对于同一个物理量会存在不同的字段映射,比如转速这个物理量,在1车型中是字段A,在2车型中是字段B,我们希望根据车型的不同,填充A or B 到转速这个物理量。

以下是实现标准化涉及的组件:

  • NiFi 自带的 LookupAttribute 组件,根据前文提到的 kafka.key 这一属性,为每条记录添加 protocol 属性,为后续每条记录输出A还是输出B提供依据。 11 12 13
  • 为每个 FlowFile 添加上 protocol 这个属性后,在使用 NiFi 自带的 ConvertRecord 组件根据 protocol 的值动态地输出A or B,以此达到标准化的效果。 14 15 16

06.

数据入库 NiFi 自带的组件有 PutDatabaseRecord,这个组件能 put 数据到市面上主流的数据库,只需指定 jdbc 的 jar包,以及配置 url,数据库名等,但是入库效率会表较低。

这里结合 mxgate 的 java api,自定义一个组件,用于加载数据到 MatrixDB。 17 PutDatabaseRecord 自定义组件的代码实现如下: 18

3. 小结

在本案例中,需要注意的是,NiFi 为了防止数据丢失会将接入的数据内容作为内容声明保存在本地,可以通过更改 NiFi 配置再重启来改变内容声明留存时间。

前期由于使用默认的留存时间,再加上服务器本身磁盘有限。因此,在很短的时间内出现磁盘满了的情况,通过改变留存时间以及定时清理内容声明,可以解决磁盘爆满的问题,但是带来的问题是,一旦碰到某种原因导致 NiFi 未及时入库数据,而被定时清理了内容声明,那数据就会丢失,还是需要通过对磁盘扩容来解决。

总的来说,“NiFi+Mxgate” 二者的结合完美地解决了数据接入的问题。