本章节将介绍使用 spark-ymatrix-connector 将 Spark 中的数据写入 YMatrix 的方法。适用于 Spark 开发人员、数据平台开发人员、数据集成开发人员,以更好的使用 YMatrix 数据库。
| 软件 | 版本信息 |
|---|---|
| Spark | 3.4.1 |
| Scala | 2.12.17 |
| Java | 1.8 |
| Connector | 3.1.0 |
spark-ymatrix-connector 是一个 Spark DataSource V2 Connector,用来把 Spark 中的数据写入 YMatrix。
你真正需要记住的入口只有一个:
.format("its-ymatrix")
只要数据已经进入 Spark,并且可以表示为 DataFrame、Spark 表或临时视图,就可以通过这个 connector 写入 YMatrix。
典型来源包括:
parquet、orc、csv、json 等文件数据本文不把重点放在抽象概念上,而是直接教你两件事:
jar 包带进 Spark本 connector 适合以下场景:
| 场景 | 支持情况 | 说明 |
|---|---|---|
| Spark DataFrame 写入 YMatrix | 支持 | 最常见的离线导入方式 |
| Spark 表到 YMatrix 单表导入 | 支持 | 适合从 Spark SQL 表或临时视图导入 |
| 写入前做过滤、字段映射、重命名 | 支持 | 在 Spark 内完成转换后再写入 |
| 按范围分批导入大表 | 支持 | 可基于业务主键或水位字段分批推进 |
| Spark SQL 方式写入 | 支持 | 通过临时 sink view + INSERT INTO 实现 |
| 写后回读校验 | 支持 | 可直接通过 connector 从 YMatrix 回读结果 |
本 connector 负责的是 Spark 到 YMatrix 的数据传输,不负责以下业务语义:
这些能力需要由你的 Spark 作业逻辑或外围调度流程负责。
先看最小可用模型:
spark.table("db.source_table")
.write
.format("its-ymatrix")
.option("url", "jdbc:postgresql://host:5432/db")
.option("user", "database_user")
.option("password", "yourpassword")
.option("dbschema", "public")
.option("dbtable", "target_table")
.mode("append")
.save()
这段代码表达的意思很简单:
its-ymatrix.save()最常用参数如下:
| 参数 | 是否必填 | 说明 |
|---|---|---|
url |
是 | YMatrix JDBC 地址 |
user |
是 | 用户名 |
password |
是 | 密码 |
dbschema |
建议填写 | 目标 schema |
dbtable |
是 | 目标表名 |
mode |
是 | 必须显式指定 append 或 overwrite |
distributedby |
自动建表时建议填写 | 自动建表时指定分布键 |
network.timeout |
否 | 网络敏感环境建议显式设置 |
server.timeout |
否 | 大批量传输建议显式设置 |
server.port |
否 | 联调网络问题时可固定端口 |
truncate |
否 | 配合 overwrite 使用,尽量保留表结构 |
这一节只讲一件事:怎么把 connector 的 jar 包正确带进 Spark。
当前仓库里已经构建出的 jar 路径是:
/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
下面分两种方式讲。
这种方式适合你已经有正式 Spark 作业代码,需要把 connector 集成进程序。
确认 jar 文件存在:
ls -lh /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
下面是一份最小可运行的 Spark 应用示例:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object SparkWriteToYMatrixApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkWriteToYMatrixApp")
.getOrCreate()
val srcDf = spark.table("dm.orders_daily")
.select(
col("order_id"),
col("user_id"),
col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
col("created_at").cast("timestamp").as("created_at")
)
.repartition(4, col("order_id"))
srcDf.write
.format("its-ymatrix")
.option("url", "jdbc:postgresql://ymatrix-master-host:5432/your_database")
.option("user", "database_user")
.option("password", "yourpassword")
.option("dbschema", "public")
.option("dbtable", "orders_daily")
.option("distributedby", "order_id")
.option("network.timeout", "120s")
.option("server.timeout", "120s")
.mode("append")
.save()
spark.stop()
}
}
如果你的业务作业本身已经打成一个单独的应用包,提交时把 connector 用 --jars 带上:
export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export APP_JAR=/path/to/your-spark-app.jar
export SPARK_LOCAL_IP=172.16.100.32
spark-submit \
--class SparkWriteToYMatrixApp \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--conf spark.sql.shuffle.partitions=4 \
--jars ${CONNECTOR_JAR} \
${APP_JAR}
至少看这三点:
Failed to find data source: its-ymatrix.save() 阶段没有报连接或 gpfdist 错误这种方式适合联调、排障、Demo 演示和一次性导入任务。
命令行调用最常见有两种:
spark-shellspark-submitspark-shell 直接写入这是最适合第一次验证链路的方式。
先启动 spark-shell:
export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export YMATRIX_URL=jdbc:postgresql://ymatrix-master-host:5432/your_database
export YMATRIX_USER=database_user
export YMATRIX_PASSWORD=yourpassword
export YMATRIX_SCHEMA=public
export SPARK_LOCAL_IP=172.16.100.32
spark-shell \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--conf spark.sql.shuffle.partitions=4 \
--jars ${CONNECTOR_JAR}
进入 spark-shell 后,直接执行:
import spark.implicits._
import org.apache.spark.sql.functions._
val srcDf = Seq(
(1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
(2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00"),
(3L, "u003", BigDecimal("99.99"), "2026-03-23 10:10:00")
).toDF("order_id", "user_id", "amount", "created_at")
.withColumn("created_at", col("created_at").cast("timestamp"))
srcDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_demo")
.option("distributedby", "order_id")
.option("server.port", "43000")
.mode("overwrite")
.save()
然后马上做一次回读校验:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select * from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo order by order_id"
)
.load()
.show(false)
spark-submit 跑脚本或程序如果你不想手工进交互式 shell,可以把逻辑写进程序后直接 spark-submit。
命令形式和“程序调用”本质一样,关键点仍然是:
--jars 带进去的外部依赖最小命令模板如下:
spark-submit \
--class SparkWriteToYMatrixApp \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--jars /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar \
/path/to/your-spark-app.jar
建议直接按下面选:
spark-shellspark-submitspark-shellspark-submit本节是本文重点。下面的示例都默认你已经把 connector jar 带进 Spark 了。
这是最常见的正式开发方式,适合源数据已经是 Spark 表或临时视图的场景。
import org.apache.spark.sql.functions.col
val sourceTable = "dm.orders_daily"
val targetTable = "orders_daily"
val sourceDf = spark.table(sourceTable)
.select(
col("order_id"),
col("user_id"),
col("shop_id"),
col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
col("status"),
col("created_at").cast("timestamp").as("created_at")
)
.repartition(4, col("order_id"))
sourceDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", targetTable)
.option("distributedby", "order_id")
.option("network.timeout", "120s")
.option("server.timeout", "120s")
.mode("append")
.save()
适合什么时候用:
如果目标表字段与源表字段不完全一致,推荐在 Spark 内先做映射,再写入。
import org.apache.spark.sql.functions._
val mappedDf = spark.table("ods.orders_raw")
.where(col("dt") === lit("2026-03-23"))
.where(col("is_deleted") === lit(0))
.select(
col("id").as("biz_order_id"),
col("buyer").as("buyer_id"),
col("seller").as("seller_id"),
col("amount").cast("decimal(18,2)").as("pay_amount"),
to_timestamp(col("pay_time")).as("pay_time")
)
mappedDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_mapped")
.option("distributedby", "biz_order_id")
.mode("overwrite")
.save()
适合什么时候用:
cast 金额、时间等关键类型开发环境经常需要“清空旧数据重新导入”,但不想重建表。
spark.table("dm.orders_demo")
.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_demo")
.option("truncate", "true")
.mode("overwrite")
.save()
适合什么时候用:
大表第一次导入时,不建议直接整表全量压上去。更稳妥的方式是基于单调字段按区间推进。
import org.apache.spark.sql.functions._
val sourceTable = "dwd.iot_orders"
val lowerBound = 1L
val upperBound = 100000L
val batchDf = spark.table(sourceTable)
.where(col("ingest_id").between(lowerBound, upperBound))
.select(
col("ingest_id"),
col("event_id"),
col("device_id"),
col("tenant_id"),
col("event_time").cast("timestamp"),
col("payload")
)
.repartition(4, col("ingest_id"))
batchDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "iot_orders_batch_demo")
.option("distributedby", "ingest_id")
.option("server.port", "43001")
.mode("append")
.save()
适合什么时候用:
如果你的主流程偏 SQL 风格,而不是 DataFrame API,可以使用临时 sink view。
先在 YMatrix 中创建目标表:
create table public.orders_sql_sink (
order_id bigint,
user_id text,
amount decimal(18,2),
created_at timestamp
)
distributed by (order_id);
然后在 Spark 中执行:
import spark.implicits._
import org.apache.spark.sql.functions._
Seq(
(1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
(2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00")
).toDF("order_id", "user_id", "amount", "created_at")
.withColumn("created_at", col("created_at").cast("timestamp"))
.createOrReplaceTempView("spark_orders_src")
spark.sql("DROP VIEW IF EXISTS ymatrix_orders_sink")
spark.sql(
s"""
|CREATE TEMPORARY VIEW ymatrix_orders_sink
|USING com.itsumma.gpconnector.GreenplumDataSource
|OPTIONS (
| url '${sys.env("YMATRIX_URL")}',
| user '${sys.env("YMATRIX_USER")}',
| password '${sys.env("YMATRIX_PASSWORD")}',
| dbschema '${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}',
| dbtable 'orders_sql_sink',
| network.timeout '120s',
| server.timeout '120s',
| dbmessages 'WARN'
|)
|""".stripMargin
)
spark.sql(
"""
|INSERT INTO TABLE ymatrix_orders_sink
|SELECT /*+ REPARTITION(4, order_id) */
| order_id,
| user_id,
| amount,
| created_at
|FROM spark_orders_src
|""".stripMargin
)
这个方式的重点:
INSERT 侧建议显式列出字段写入完成后,建议至少做一次基础校验。
行数校验:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select count(*)::bigint as cnt from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
)
.load()
.show(false)
主键或水位范围校验:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select min(order_id) as min_id, max(order_id) as max_id from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
)
.load()
.show(false)
推荐至少做三类校验:
写入时必须明确使用:
mode("append")mode("overwrite")不要省略写入模式。
如果目标表不存在,connector 可以在部分场景下自动建表;但正式生产环境仍建议对核心表采用预建表方式,显式管理:
如果报错与 gpfdist 连接有关,优先排查:
SPARK_LOCAL_IP 是否为 YMatrix segment 可访问地址server.port 便于联调repartition 主要影响写入并发。
经验上建议:
repartition正式导入前,建议重点验证以下字段类型:
decimaltimestampdate建议按下面顺序推进:
spark-shell 跑最小 3 行样例这样最容易把 jar、参数、网络、结构和性能问题拆开定位。