数据写入工具 MatrixGate
本文档介绍了数据高速写入工具 MatrixGate 的工作原理、基本用法、参数信息、主要功能及一些其他编程语言接入 MatrixGate 的代码示例。 此文档中介绍的主要功能有:
MatrixGate(简称 mxgate)是高性能流式数据加载工具,位于 YMatrix 安装目录下的 bin/mxgate。MatrixGate 当前支持通过 SDK 或 HTTP 和 STDIN API 接口加载数据。数据格式支持 TEXT 和 CSV。
1 MatrixGate 工作原理
MatrixGate 加载数据的逻辑如下图所示,1)数据采集系统采集设备数据或者接收由设备发送来的数据 2) 采集系统以并发微批的模式向 MatrixGate 的服务进程 mxgate 持续发送数据 3)mxgate 进程和 YMatrix 的 主节点(Master)进程高效通信,沟通事务和控制信息 4)数据直接发送到数据节点(Segment),并行高速写入。
2 MatrixGate 基本用法
-
指定目标数据库和目标表,生成 mxgate 配置文件
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
上述参数将生成一个配置文件 mxgate.conf,允许用户对 testtable 和 testtable2 的加载做些个性化定制,同时也可以使用全局缺省设置往其他表中加载数据。
-
根据需要修改 mxgate 配置文件,如配置数据分隔符等,选择默认配置可忽略此步骤。可以在该配置文件中看到与 testtable 和 testtable2 对应的设置如下:
[[job.target]] # delimiter = "|" # exclude-columns = [] # format = "text" name = "job_text_to_public.testtable" # null-as = "" table = "public.testtable" # time-format = "unix-second" # use-auto-increment = true [[job.target]] # delimiter = "|" # exclude-columns = [] # format = "text" name = "job_text_to_public.testtable2" # null-as = "" table = "public.testtable2" # time-format = "unix-second" # use-auto-increment = true
如果 testtable 的分隔符是 @,而 testtable2 的分隔符是 %,可以把上述配置修改为:
[[job.target]] delimiter = "@" # exclude-columns = [] # format = "text" name = "job_text_to_public.testtable" # null-as = "" table = "public.testtable" # time-format = "unix-second" # use-auto-increment = true [[job.target]] delimiter = "%" # exclude-columns = [] # format = "text" name = "job_text_to_public.testtable2" # null-as = "" table = "public.testtable2" # time-format = "unix-second" # use-auto-increment = true
mxgate 默认监听 8086 端口接收数据,可以在 mxgate.conf 中看到 source.http 下的 http-port 子项设置为 8086,如果需要,可以改为其他端口:
[source] ## Source plugin is the data entrance to MatrixGate ## Types restricted to: http source = "http" [source.http] ## Port of http push # http-port = 8086 ## Maximum request body size (after gzip) ## The server rejects requests with bodies exceeding this limit. # max-body-bytes = 4194304 ## The maximum number of concurrent HTTP connections to the server ## The server response with 503 after exceed this limit. # max-concurrency = 40000
-
启动 mxgate,加载配置文件,连接 demo 数据库,准备接收数据加载请求
mxgate start --config mxgate.conf
-
查看后台服务状态
mxgate status
-
终止后台服务
mxgate stop
当遇到超时或者其他问题需要强制停止时,可以这样执行:
mxgate stop --force
3 MatrixGate 命令行参数详解
参数名 | 参数值 | 参数含义 |
---|---|---|
[general]类别 | ||
--job-interval-get | 默认 false | 获取写入事务的工作时间 |
--job-list | 默认false | 获取所有的写入事务信息 |
--job-state | 默认false | 获取所有的写入事务状态 |
--stream-prepared-get | 默认 false | 获取写入事务中写入连接的数量 |
--stream-status-get | 默认 false | 获取写入事务中写入连接的状态 |
--stream-prepared-cli int | 默认 0 | 写入事务中活跃写入连接的数量 |
--pause | 默认false | 中断写入 |
--resume | 默认false | 恢复写入 |
[database]类别 | ||
--db-database | 默认 postgres | MatrixGate 连接 YMatrix 数据库名 |
--db-master-host | 默认本机主机名 | MatrixGate 连接 YMatrix 主机名 |
--db-master-port | 默认 5432 | MatrixGate 连接 YMatrix 主机端口号 |
--db-user | 默认当前系统用户名 | MatrixGate 连接 YMatrix 用户名 注意! 该用户必须具有创建外部表的权限,如果使用的是非超级权限用户,请使用如下命令增加权限: alter user {username} CREATEEXTTABLE; |
--db-password | 默认为空 | MatrixGate 连接 YMatrix 用户密码 |
--db-max-conn | 默认10 | MatrixGate 连接 YMatrix 最大连接数 |
[job]类别 | ||
--allow-dynamic | 默认 false | 当指定 --allow-dynamic=true 时,允许根据 POST 的数据内容(第一行),动态适配插入的目标表。此选项应仅用于 MatrixGate 启动时目标表名尚未确定的场景。如果固定插入某个已知的目标表,推荐用 --target 显式指定表名 |
--delimiter | 默认为 | | 指定用于分隔文件每一行(行)中各列的字符 |
--error-handling | 默认为 'accurate' | 遇到格式错误行的处理方式 'accurate':错误数据不入库并记录错误日志,该批次其他数据不受影响 'legacy':该批次整体失败 |
--exclude-columns | 默认为空 | 数据加载默认提供的列数量和列顺序需要与表定义一致,当数据加载仅提供部分列时,--exclude-columns 用于标记排除的列名,其他列仍需要保证顺序与表定义一致。 提示:如果已经开启 --use-auto-increment 跳过自增字段,则不需要在此列出这些自增字段。这个参数只需要标记出其他还需要排除的列名即可 |
--format | 默认 text | 指定源数据的数据格式 text 或 CSV。text 速度最快,但不支持字符类型中出现换行。CSV 格式适用性更广,对字符类型的列必须用双引号。 |
--null-as | 默认空字符串 | 指定表示空值的字符串。 默认值为无引号的空字符串。当数据表中的列约束为非空 NOT NULL,且数据内容上该列给出了空值,将会导致加载报错。提示:如需要使用 \N 为空值,需要对反斜杠进行转义,如:--null-as \N |
--time-format | 默认 unix-second | 指定时间戳单位:unix-second | unix-ms|unix-nano | raw。\n MatrixGate默认将每行数据的第一列当作时间戳的 Unix 表示,自动将其转化为数据库时间格式。如果时间戳不在第一列,或者用户已经自行转换为数据库格式,则应使用 raw,这样 MatrixGate 不会做时间类型转换。 |
--upsert-key | 默认为空 | 进行 UPSERT 的键名,可以指定多个。 需要做 UPSERT 的表,必须建立 UNIQUE 约束,且参数中要指定所有约束键。 |
--deduplicate-key | 默认为空 | 用法和 UPSERT 类似,区别是只更新空值,如果旧值非空,新值丢弃。 和 --upsert-key 参数互斥,只能选一种。 |
--use-auto-increment | 默认 true | 当 target 表中含有自增字段时,是否在加载数据中跳过自增字段赋值而使用系统默认自增值 |
--target | schemaName.tableName | 指定目标的表名,schemaName 可以省略,默认为 public。允许指定多个目标表,使用方法是 "--target 表1 --target 表2 …"。当不提供此参数时,可以额外指定 --allow-dynamic 参数来允许动态适配表名。 |
[misc]类别 | ||
--log-archive-hours | 默认为 72 | 日志目录下,超过一定时间未发生改变的 MatrixGate 日志文件,被自动压缩 |
--log-compress | 默认为 true | 是否开启 log 自动压缩的全局开关 |
--log-dir | 默认为 /home/mxadmin/gpAdminLogs | 日志目录 |
--log-max-archive-files | 默认为 0 | 最多保留多少个压缩的 log 文件,超过这个数量,则最老的日志文件会被删除。0 为不删除 |
--log-remove-after-days | 默认为 0 | 被压缩后的 log 文件,再经过多少天被自动删除。0 为不删除 |
--log-rotate-size-mb | 默认为 100 | 当前 log 文件超过一定大小则自动切换到一个新文件,旧文件则立即压缩 |
[source]类别 | ||
--source | 默认 http | MatrixGate 数据来源,支持 http、stdin、kafka、transfer |
[source]类别 | [http] | |
--http-port | 默认 8086 | MatrixGate 用户提交数据的HTTP接口 |
--max-body-bytes | 默认 4194304 | 每个 HTTP 包体大小上限 |
--max-concurrency | 默认 40000 | HTTP 最大并发连接数 |
--request-timeout | 默认 0 | 请求超时时间,默认 0,无限等待。当设置大于 0 的值,会在等待毫秒单位的设置时间后超时并返回 HTTP408。 |
--disable-keep-alive | 默认 false | MatrixGate 在每次 HTTP 请求后强制断开连接 |
--http-debug | 默认false | 输出附加 HTTP 诊断信息 |
[source]类别 | [transfer] | |
--src-host | 源库 Master 的 IP 地址 | |
--src-port | 源库 Master 的端口号 | |
--src-user | 连接源库的用户名(建议使用 Superuser) | |
--src-password | 连接密码 | |
--src-schema | 源表的模式名 | |
--src-table | 源表的表名 | |
--src-sql | 进行迁移数据过滤的 SQL | |
--compress | 源数据库 Segment 主机到本数据的传输方法: 空白字符串“”,代表不压缩,明文传输 gzip:使用 gzip 压缩,需要源数据库的 Segment 主机上必须安装有 gzip 这个 Linux 命令 lz4:使用 lz4 压缩,需要源数据库的 Segment 主机上必须安装有 lz4 这个 Linux 命令 推荐 lz4 > gzip > 不压缩 |
|
--port-base | 传输中会占用一批端口,端口的范围为 9129~ | |
--local-ip | 必须用源库可以连接到本机的 IP 地址 | |
[writer]类别 | ||
--interval | 默认 100 毫秒 | MatrixGate 执行批量数据加载时间周期 |
--stream-prepared | 默认 10 | 插入工作进程并行度 |
--use-gzip | 默认 'auto' | MatrixGate 向 Segment 发送数据时是否开启压缩,可配置参数为 auto/yes/no |
--max-seg-conn | 默认 128 | 外部表从 MatrixGate 拉取数据时启动的segment数量,调大该参数会增加网络连接资源 |
--timing | 默认 false | 开启该参数后,MatrixGate 在记录日志时会为每条 INSERT 增加耗时信息 |
--insert-timeout | 默认 0 | MatrixGate 执行 INSERT 语句超时时间,默认为 0,无限等待。 设置大于 0 的值后会在等待毫秒单位配置时间后超时 |
其他 | ||
--help | 显示用法和参数列表 |
4 MatrixGate API
MatrixGate 对外提供 HTTP API,支持各种编程语言通过 HTTP 接口将数据导入到 YMatrix 数据库中。
MatrixGate HTTP 协议格式
协议类型 | 协议格式 | 用法及示例 |
---|---|---|
URL | http://mxgate-host:port | 指定 mxgate 连接地址 |
PATH | / | 当前支持/,忽略/后面任何 PATH |
HTTP Method | POST | 当前支持 POST 方式加载数据 |
HTTP Header | Content-Encoding: gzip | 当前支持 gzip 对 HTTP Body 内容压缩 |
Content-Type: text/plain | 当前支持 text/plain | |
HTTP Body | SchemaName.TableName Timestamp|ID]|C1|C2|..|Cn |
Body 格式第一行为数据加载的目标表,SchemeName 可省略,默认为 Public,TableName 为必须项,第二行开始是时序数据行,每行对应目标表的一行,列之间使用 | 分隔符,行之间使用 \n 分隔符。每行第一个字段为时间戳,格式为 UNIX 时间戳精确到秒,参见 --time-format 的说明。每行第二个字段为 TagID,整型。每行第三个字段到最后一个字段为与目标表对应的列。 建议目标表的 DDL 定义也遵循( Timestamp,TagID,C1,C2,…,Cn)的列顺序 |
MatrixGate HTTP 响应码
响应码 | 响应码含义 | 备注 |
---|---|---|
200 | StatusOK | 部分数据格式错误,响应 Body 里会包含错误的行以错误信息,如:At line: 2 missing data for column "c3" |
204 | StatusNoContent | 数据成功加载到 MatrixGate |
400 | StatusBadRequest | 数据请求错误,如 POST BODY 格式错误、目标表不存在、数据压缩格式与 HTTP 请求头不符等 |
405 | StatusMethodNotAllowed | HTTP 非 POST 请求 |
408 | StatusTimeout | 请求超时 |
500 | StatusIntervalServerError | 数据库端错误,数据加载失败,响应 Body 内包含详细错误信息 |
503 | StatusServiceUnavailable | MatrixGate 拒绝请求,如超过最大连接数,或 MatrixGate 正在关闭等 |
5 MatrixGate HTTP API 命令行示例
-
在 demo 数据库中创建表 testtable
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);
-
编辑数据加载文件 data.txt,内容如下
public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
-
启动 mxgate,指定生成好的配置文件 mxgate.conf
mxgate --config mxgate.conf
-
发送 HTTP 请求加载数据
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
-
连接数据库查询数据是否加载成功
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
6 编程语言连接 MatrixGate
6.1 JAVA 连接 MatrixGate
6.1.1 MatrixGate SDK Java
SDK(Software Development Kit)指软件开发工具包,它可将开发者从非业务逻辑的功能开发中解放出来,极大地提升开发效率和体验,增加易用性。
- 通过 Maven 和 Gradle 引入 SDK 依赖
你可以通过使用 Maven 远程仓库、使用 Maven 工具导入本地 jar 包、使用 Gradle 远程仓库、使用 Gradle 工具导入本地 jar 包四种方式完成 SDK 的引入。注意!
四种方式任选其一即可。推荐使用 Maven 或 Gradle 远程仓库引入 SDK,高效又便捷。
- 在你的 JAVA 工程的 pom.xml 文件中配置如下依赖,以调用 Maven 远程仓库自动下载 SDK 包。
<dependency> <groupId>cn.ymatrix</groupId> <artifactId>mxgate-sdk-java</artifactId> <version>1.0.17</version> </dependency>
- 使用 Maven 导入本地 jar 包。
首先,点击下载 SDK 最新版本 v1.0.17,按照如下图示操作。
随后,在 pom.xml 文件中配置如下内容:
<!-- 在 pom.xml 文件中配置 -->
<dependencies>
...
<dependency>
<groupId>cn.ymatrix</groupId>
<artifactId>mxgate-sdk-java</artifactId>
<version>1.0</version>
<scope>system</scope>
<!-- systemPath中填入 jar 包的绝对路径, 这里 ${project.basedir} 指的是当前 project 文件夹路径 -->
<systemPath>${project.basedir}/lib/mxgate-sdk-java-1.0.jar</systemPath>
</dependency>
...
</dependencies>
最后,为了保障 jar 包的使用(不会出现 class not found 相关报错),还需要在 pom.xml 中增加以下内容,使得 target 文件夹包含所需依赖。
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<!-- 指定 jdk 版本,与本地一致即可, 若为1.8 则填入8即可-->
<source>8</source>
<target>8</target>
<compilerArguments>
<!-- 指定 local jar 所在目录,下例中在project下lib目录存放local jar -->
<extdirs>${project.basedir}/lib</extdirs>
</compilerArguments>>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>
<!-- 在生成jar文件时向MANIFEST.MF中添加 classpath -->
<addClasspath>true</addClasspath>
<!-- 定义上述 classpath 的前缀, 需要与下面 maven-dependency-plugin 的 outputDirectory 一致 -->
<classpathPrefix>lib</classpathPrefix>
<mainClass>Main.Main</mainClass>
</manifest>
<manifestEntries>
<!-- 把 jar 加入到 MANIFEST.MF -->
<Class-Path>lib/mxgate-sdk-java-1.0.jar</Class-Path>
</manifestEntries>>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<!-- 在 maven 产物target目录下生成lib目录,并把所有远程和本地的jar拷贝到该目录下 -->
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
...
</plugins>
完成。
- 使用 Gradle 远程仓库引入 SDK 依赖。
repositories { mavenCentral() }
dependencies { implementation 'cn.ymatrix:mxgate-sdk-java:1.0.17' }
- 使用 Gradle 导入本地 jar 包。
假设 jar 包存放在本地 libs 路径下,使用以下代码导入本地 jar 包:
repositories { flatDir { dirs 'libs' } }
dependencies { implementation name: 'cn.ymatrix:mxgate-sdk-java:1.0.17' }
2. 启动 mxgate
YMatrix 支持通过 gRPC、HTTP 两种方式写入数据。具体说明如下:
- 启动 mxgate ( gRPC Source )
> ***注意!***
需要使用支持 gRPC 的 YMatrix 版本,即 4.6.1 及之后。
编写配置文件 mxgate_grpc.conf 并启动 mxgate。示例代码如下:
创建 mxgate config 文件
mxgate config \ --source grpc \ --db-database demo \ --target public.test_table_transfer \ --time-format raw \ --grpc-port 8087 \ --format csv \
mxgate_grpc.conf
启动 mxgate
mxgate start --config mxgate_grpc.conf
如示例代码所示,使用 SDK 向 mxgate 写入数据,需要指定 mxgate 的 source 参数为 grpc,format 为 csv。同时由于 SDK 需要知道将数据写入到哪里,需要在配置文件中指定 grpc-port 端口号,示例中为 8087。
- 启动 mxgate(HTTP Source)
> ***注意!***
通过 HTTP 启动 mxgate 依然需要指明 mxgate 的 gRPC 端口号,因为 SDK 从 mxgate 获取数据库表元信息的方式依然是 gRPC,只是写入数据的方式切换为 HTTP。
示例代码如下:
创建 mxgate config 文件。
mxgate config \ --source http \ --db-database demo \ --target public.test_table_transfer \ --time-format raw \ --grpc-port 8087 \ --format csv \
mxgate_grpc.conf
启动 mxgate
mxgate start --config mxgate_grpc.conf
3. 使用 SDK 将数据发送给 mxgate
在此步骤中将介绍异步、同步两种发送方式,根据需求选择即可。
示例用表结构如下:
Partitioned table "public.test_table_transfer"
Column | Type | Collation | Nullable | Default --------+-----------------------------+-----------+----------+--------- ts | timestamp without time zone | | | tag | integer | | not null | c1 | double precision | | | c2 | double precision | | | c3 | double precision | | | c4 | double precision | | | c5 | text | | | c6 | text | | | c7 | text | | | c8 | text | | | Partition key: RANGE (ts)
- 使用 SDK 异步发送数据
异步发送即将 Tuple append 到 SDK 内部的一个队列中,然后通过异步的 HTTP 请求发送数据。
首先需要启动 MxBuilder ,该 Builder 是单例模式,全局唯一。
> ***注意!***
在一个工程里只需要 Build 一次。 在初始化 MxBuilder 前需进行相关配,具体配置信息请见此部分末段介绍。
MxBuilder builder = MxBuilder.newBuilder() .withDropAll(true) // 用于测试,如果 == true,则不会发送数据给 mxgate,直接 drop。 .withCacheCapacity(100000) // 用于暂存 tuples 微批的 queue的大小 .withCacheEnqueueTimeout(2000) // 若queue满,tuples 存入 queue 的等待超时时间。若超时,会抛出IllegalStateException .withConcurrency(10) // 同时向 mxgate 写入数据的线程数量。 .withRequestTimeoutMillis(3000) // 每个线程每次数据写入请求的超时时间(毫秒)。 .withMaxRetryAttempts(3) // 每个线程每次写入请求遇到问题后的重试次数。 .withRetryWaitDurationMillis(3000) // 每次重试的时间间隔(当前的实现,每次重试的时间间隔是固定的)。 .withRequestType(RequestType.WithHTTP) // SDK 支持通过 HTTP 和 gRPC 两种方式向 mxgate post 数据,对应的配置为:RequestType.WithHTTP,RequestType.WithGRPC .withCircuitBreaker() // 使用内置熔断器。若失败率或者慢请求率达到阈值,则会开启,开启后持续30秒,暂停向 mxgate 发送数据,亦无法 append tuple .withMinimumNumberOfCalls(1) // 熔断器生效的最小请求数(要求 >= 1,默认 10) .withSlidingWindowSize(10) // 用以计算失败率的滑动窗口大小(要求 >= 1,默认 100) .withFailureRateThreshold(60.0f) // 失败率阈值(要求 >0 且 <= 100),若失败率达到阈值则会开启熔断器 .withSlowCallDurationThresholdMillis(1000) // 慢请求时长阈值(毫秒),超过该时长则认为是慢请求(注意该时长应小于请求超时时间) .withSlowCallRateThreshold(80.0f)// 慢请求阈值,若慢请求率达到阈值则会开启熔断器 .build();
Builder 的 connect 方法接收四个参数:
(1). mxgate 进程提供特定服务所在的 hostname (IP 地址)及端口号,用于接收数据及数据库表元信息。
例如,对于 HTTP 方式为 “http://localhost:8086/”;
而对于 gRPC 方式,则为 “localhost:8087”
(2). schema。
(3). table name。
(4). callback,如果 builder 能够成功连接到 mxgate,则 onSuccess 方法会回调,同时会返回一个 MxClient 实例用于数据写入,如果连接失败,则 onFailure 会回调,failureMsg 会说明失败的原因。
builder.connect("http://localhost:8086/", "localhost:8087", "public", "test_table", new ConnectionListener() { @Override public void onSuccess(MXClient client) { sendData(client); }
@Override
public void onFailure(String failureMsg) {
}
}【)????】;
通过 ConnectionListener 的 onSuccess() 回调函数,你可以拿到 MxClient 实例,随后通过 MxClient 提供的 API,即可实现向 mxgate 写入数据的功能。
> ***注意!***
MxClient 是线程安全的,但是如果在多线程并发使用,为了确保最佳性能,最好是每一个线程中有一个单独的 MxClient,也就是通过 MxBuilder 多次 connect 返回多个 MxClient 实例。
private void sendData(MxClient client) { if (client != null) { // MxClient 会累积一批 Tuple 作为一个微批一并发送给 mxgate, // 该 API 会设置每个微批累积的等待时间,默认2000 millisecond, // 也就是每个2s会尝试发送一批数据到 mxgate,如果该2s的时间段内 // 没有数据写入,则不会发送。 client.withIntervalToFlushMillis(5000); // 设置累积多少个 Tuple 字节作为一个微批进行发送,即使时间没有达到 // 设置的 flush interval,累积的微批字节数达到了,也会发送。 // flush interval 的时间达到了,没有累积够设定的字节数,也会发送。 client.withEnoughBytesToFlush(500); // MxClient 可以注册一个 DataPostListener,每批数据发送的成功和失败 // 都会在 onSuccess 和 onFailure 中回调。用户可以知道哪些 Tuple 写入成功了 // 哪些 Tuple 写入失败了。 client.registerDataPostListener(new DataPostListener() { @Override public void onSuccess(Result result) { System.out.println(CUSTOMER_LOG_TAG + "Send tuples success: " + result.getMsg()); System.out.println(CUSTOMER_LOG_TAG + "Succeed lines onSuccess callback " + result.getSuccessLines()); }
@Override
public void onFailure(Result result) {
// result.getErrorTuplesMap() 包含错误的行和错误原因的键值对 Tuple -> String
// Key 是错误的行, value 是错误的原因;
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
System.out.println(result.getSuccessLines());
}
});
通过 MxClinet 的 generateEmptyTuple API 来返回一个空的 Tuple 对象用于赋值。
Tuple tuple1 = client.generateEmptyTuple(); Tuple tuple2 = client.generateEmptyTuple();
以 Key -> Value的方式给空的 Tuple 赋值。Key 是对应数据库表的列名;Value 是该字段对应的值。如果有某些字段值允许为 null 或者有默认值,则可以不 add 该字段的值,SDK 会自动填充默认值或者空值。
tuple1.addColumn("ts", "2022-05-18 16:30:06"); tuple1.addColumn("tag", 102020030); tuple1.addColumn("c1", 1.1); tuple1.addColumn("c2", 2.2); tuple1.addColumn("c3", 3.3); tuple1.addColumn("c4", 4.4); tuple1.addColumn("c5", "中文字符测试-1"); tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6"); tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7"); tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");
tuple2.addColumn("ts", "2022-05-18 16:30:06");
tuple2.addColumn("tag", 102020030);
tuple2.addColumn("c1", 1.1);
tuple2.addColumn("c2", 2.2);
tuple2.addColumn("c3", 3.3);
tuple2.addColumn("c4", 4.4);
tuple2.addColumn("c5", "中文字符测试-2");
tuple2.addColumn("c6", "lTxFCVLwcDTKbNbjau_c26");
tuple2.addColumn("c7", "lTxFCVLwcDTKbNbjau_c27");
tuple2.addColumn("c8", "lTxFCVLwcDTKbNbjau_c28");
最后,将赋值完毕的 Tuple append 到 MxClient。
client.appendTuples(tuple1, tuple2); 【}???】
> ***注意!***
MxClient 提供多个 API,可以一次 Append 一个 Tuple,也可以一次 Append 多个 Tuple,或者一个 Tuples List。如: `client.appendTuple();` ,`client.appendTupleList();`;也可以调用 MxClient 的 flush() 方法,手动发送数据,无论此时写入了多少个 Tuple 的数据,如 `client.flush();`。
- 使用 MxClient 同步发送数据
同步发送即同步的发送 Tuple 到 mxgate ,以便于进行后续的处理(如提交 Kafka offset)。
> ***注意!***
目前只支持 HTTP 方式。
Tuple tuple1 = client.generateEmptyTuple(); tuple1.addColumn("ts", "2022-05-18 16:30:06"); tuple1.addColumn("tag", 102020030); tuple1.addColumn("c1", 1.1); tuple1.addColumn("c2", 2.2); tuple1.addColumn("c3", 3.3); tuple1.addColumn("c4", 4.4); tuple1.addColumn("c5", "lTxFCVLwcDTKbNbjau_c5"); tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6"); tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7"); tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");
手动 trigger tuples flush。
appendTupleBlocking 会返回 boolean 类型的返回值含义如下表:
| true | fasle |
| -- | -- |
| MxClient 中已经写满了设定的 bytes size,可以发送请求 | 设定的 bytes size 还没写满 |
try { if (client.appendTupleBlocking(tuple1)) { l.info("append tuples enough");
client.flushBlocking();
}
}
如果如下 flushBlocking 抛出异常,说明整批 Tuple 都没能写入到 mxgate,调用方可以针对该异常做相应的处理。
catch (AllTuplesFailException e) { l.error("Tuples fail and catch the exception return.", e); for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) { l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue()); for (Column c : entry.getKey().getColumns()) { l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue()); } } }
如果如下 flushBlocking 抛出异常,则说明有部分 Tuple 没能写入到 mxgate,调用方可以针对该异常做相应的处理。
catch (PartiallyTuplesFailException e) { for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) { l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue()); for (Column c : entry.getKey().getColumns()) { l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue()); } } }
#### 6.1.2 MatrixGate HTTP API Java 示例
import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL;
public class MxgateExample { public static void main(String[] args) throws Exception { MxgateExample http = new MxgateExample(); http.sendingPostRequest(); }
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate监听在localhost的8086端口
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// 数据有中文时,可以通过postJsonData.getBytes("UTF-8")编码
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
<a name="python"><br/></a>
### 6.2 MatrixGate HTTP API Python 示例
import http.client
class MxgateExample(object): def init(self):
mxgate监听在localhost的8086端口
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if name == 'main': gate_post = MxgateExample() gate_post.sending_post_request()
<a name="C#"><br/></a>
### 6.3 MatrixGate HTTP API C# 示例
> 建议开发代码使用 C# Core 开发环境
using System; using System.IO; using System.Net; using System.Text;
namespace HttpPostTest { class Program { static void Main(string[] args) { var url = "http://10.13.2.177:8086/"; var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){ string result = ""; HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url); req.Method = "POST"; req.ContentType = "text/plain";
#region 添加Post 参数
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//获取响应内容
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
} }
> 如果遇到 error when serving connection ***** body size exceeds the given limit 这个问题,调大 mxgate.conf 下的 max-body-bytes
### 6.4 MatrixGate HTTP API Golang 示例
package main
import ( "bytes" "net/http" )
func PostDataToServer(URL string) error {
data := public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Deal with the response body.
return nil
}
// Deal with the response body.
return nil
}
func main() { err := PostDataToServer("http://127.0.0.1:8086") if err != nil{ panic(err) }
}
## 7 MatrixGate 加载特殊类型
### 7.1 MatrixGate 加载 CSV 文件示例
- 在 demo 数据库中创建表 csvtable
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);
- 编辑数据加载文件 data.csv,内容如下
1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
- 启动 mxgate,指定 source 参数为 stdin,目标表为已经存在的 csvtable,加载并行度为 2
mxgated \ --source stdin \ --db-database demo \ --db-master-host 127.0.0.1 \ --db-master-port 5432 \ --db-user mxadmin \ --time-format unix-nano \ --delimiter "|" \ --target csvtable \ --parallel 2 < data.csv
- 连接数据库查询数据是否加载成功
demo=# SELECT * FROM csvtable ; time | tagid | c1 | c2 | c3 ------------------------+-------+-----+-----+----- 2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303 2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302 2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
### 7.2 MatrixGate 加载 JSON 字段示例
### 7.2.1 JSON
- 创建表
create table json_test(id int, j json);
- 创建数据文件
`~/json.csv`
1|"{""a"":10, ""b"":""xyz""}"
- 加载
这里使用 stdin 模式为例,其他模式都一样。
关键在 `--format csv`
mxgated \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_test < ~/json.csv
- 查看加载数据
postgres=# select * from json_test; id | j ----+----------------------- 1 | {"a":10, "b":"xyz"} (1 row)
### 7.2.2 JSON 数组
- 创建表
create table json_array_test(id int, j _json);
- 创建数据文件
`~/json_array.csv`
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
- 加载
mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_array_test < ~/json_array.csv
- 验证
postgres=# select * from json_array_test ; id | j ----+--------------------------------------------- 1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"} (1 row)
> ***注意!***
因为 JSON 列包含引号等特殊字符,所以 mxgate 的 --format 参数必须为 CSV。
## 8 观测 mxgate 运行指标
watch 是 mxgate 的一个子命令,用一系列指标描述 mxgate daemon 运行情况。
watch 有两种模式:
- 实时观测模式, 用类似 sar 的格式每隔 3 秒将 gate 的各项指标打印在控制台。
- 历史观测模式,可以指定任意时间段,任意时间周期(例如昨天的每个小时,上个月的每一天,去年的每一个月)统计导入速度。
### 8.1 实时观测
mxgate watch
会每三秒收集 mxgate 的运行指标,输出结果如下
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0 2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0 2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0 2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0 2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0 ...
可通过 --info 参数来获取上述各个指标的说明
mxgate watch --info
默认情况下只会输出速度指标,可通过 --watch-latency 参数来观测时间指标用于分析问题
mxgate watch --watch-latency
### 8.2 历史数据观测
mxgate watch --history
会计算截至当前时间为止,24小时内的每小时的平均速度,输出结果如下
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0 2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0 2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0 2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0 2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
其中 SPEED/S,BANDWIDTH MB/S 代表导入的条目速度与导入带宽(MB/s 为单位),
BLOCK ITEMS 代表阻塞在 mxgate 的数据量,当数据库消费速度跟不上数据源(http,kafka 等)生产速度时,这个值会上升
可以添加 `--watch-start`,`--watch-end`,`--watch-duration` 参数来控制观测历史数据的时间区间与周期
例如
mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'
可以获得从 3 月 27 日到 4 月 27 日每周(每 168h)平均导入速度
其中 `--watch-duration` 支持 `h` `m` `s` 三种单位
## 9 不停机更新并行写入相关参数
mxgate 支持在不停止运行的情况下,修改并行加载的相关参数:"interval"、“stream-prepared”。 "interval" 代表的是完成从 mxgate 写入数据到数据库表的每个写入连接的工作时间,“stream-prepared” 代表的是活跃的写入连接的数量。在 mxgate 逻辑中,同一个数据库表同一时间只有一个写入连接执行写入数据的事务,因此每个数据库表都需要有多个连接在不同时间区间去不停地执行它自身的写入事务,从而保证高速率、高效率地写入数据。在此过程中,你可以通过 “interval” 这个参数来实现对于每个写入连接的工作时间的调整,从而改善数据的写入速率,针对性提高加载性能。具体用法示例如下:
- mxgate set --stream-prepared-cli 3
设置每个表的写入连接数量为 3

- mxgate get --stream-prepared-get 获取每个表活跃的写入连接数量

- mxgate set --job-interval 200 设置所有表的写入连接的时间间隔为 200ms

- mxgate get --job-interval-get 获取所有表的写入连接当前的时间间隔

> ***注意!***
对于以上参数,如果你想设置或获取某个特定表的写入连接数量或工作时间,那么在上述命令后加 “--job <名称>” 即可。每个事务(job)对应一张数据库表,job 参数结构由模式名与表名组成,意思就是如果你的特定表叫做 “test_table”,模式名为 “public”,那么完成指定就需要在已有命令后加上 “--job public.test_table” 就是对的。
## 10 不停机更新表结构
在数据加载的过程中,你突然发现在变化的时序数据源驱动下,之前设定的表结构对于现在场景不再适用了,然后你就有了修改表结构的需求,这是 mxgate 可以满足的。此节将会说明,mxgate 如何在不停机的情况下,执行暂停数据写入,重载修改后的数据库表元信息,恢复数据写入等系列操作。具体步骤如下:
* 首先,使用命令 “mxgate pause -X” 中断所有表的写入连接,为修改数据库表结构作准备。其中 “-X” 参数是必要的,它会帮助中断 mxgate 和数据库之间的连接。如果不中断连接,是无法对数据库表进行修改的。除了 “-X” 之外,运用 “-S” 参数可以令 pause 命会同步等待所有连接中断完成再返回。

* 其次,中断对应表的所有写入连接之后,即可以对数据库对应的数据库表执行修改结构的操作,如增加若干列,删除若干列,删除已有表,重新创建“同名”的新表。
> ***注意!***
重建的表结构可以不同,但“表名”必须保持一致。
* 最后,使用命令 “mxgate resume -R” 恢复所有表的写入连接,重载数据表的元信息。其中 “-R” 参数是必需的,“resume” 与 “-R” 将组合完成重载操作。

* 特别地,多个 mxgate 进程同时运行的情况,需要 “-p” 参数表示对应 mxgate 进程的进程号,上述所有命令都是同理。

> ***注意!***
执行重载命令的前提是 mxgate 的对应的表的所有写入连接必须先被暂停,否则会出现如下报错:
