This chapter introduces the method for writing data from Spark to YMatrix using spark-ymatrix-connector. This guide is intended for Spark developers, data platform engineers, and data integration specialists to effectively utilize the YMatrix database.
| Software | Version Information |
|---|---|
| Spark | 3.4.1 |
| Scala | 2.12.17 |
| Java | 1.8 |
| Connector | 3.1.0 |
The spark-ymatrix-connector is a Spark DataSource V2 Connector designed to write data from Spark into YMatrix.
There is only one entry point you need to remember:
.format("its-ymatrix")
As long as your data exists within Spark and can be represented as a DataFrame, Spark table, or temporary view, it can be written to YMatrix via this connector.
Typical data sources include:
parquet, orc, csv, or jsonThis document focuses on practical implementation rather than abstract concepts. It teaches two key skills:
jar package into Spark.This connector is suitable for the following scenarios:
| Scenario | Support Status | Description |
|---|---|---|
| Spark DataFrame to YMatrix | Supported | The most common offline import method. |
| Spark Table to Single YMatrix Table | Supported | Suitable for importing from Spark SQL tables or temporary views. |
| Filtering, Field Mapping, and Renaming Before Write | Supported | Perform transformations within Spark before writing. |
| Batch Import of Large Tables by Range | Supported | Progressively import based on business primary keys or watermarks. |
| Write via Spark SQL | Supported | Achieved through a temporary sink view + INSERT INTO. |
| Post-Write Readback Verification | Supported | Directly read back results from YMatrix using the connector. |
This connector handles data transmission from Spark to YMatrix. It does not manage the following business semantics:
These capabilities must be handled by your Spark job logic or external scheduling workflows.
First, examine the minimal working model:
spark.table("db.source_table")
.write
.format("its-ymatrix")
.option("url", "jdbc:postgresql://host:5432/db")
.option("user", "database_user")
.option("password", "yourpassword")
.option("dbschema", "public")
.option("dbtable", "target_table")
.mode("append")
.save()
This code performs the following steps:
its-ymatrix..save().The most commonly used parameters are listed below:
| Parameter | Required | Description |
|---|---|---|
url |
Yes | YMatrix JDBC URL. |
user |
Yes | Username. |
password |
Yes | Password. |
dbschema |
Recommended | Target schema. |
dbtable |
Yes | Target table name. |
mode |
Yes | Must explicitly specify append or overwrite. |
distributedby |
Recommended (Auto-create) | Specify the distribution key when auto-creating tables. |
network.timeout |
No | Recommended for network-sensitive environments. |
server.timeout |
No | Recommended for large-volume transfers. |
server.port |
No | Useful for fixing ports during network debugging. |
truncate |
No | Use with overwrite; helps preserve table structure. |
This section covers a single topic: how to correctly include the connector's jar file in Spark.
The built jar path in the current repository is:
/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
Two methods are described below.
This method is suitable if you already have formal Spark job code and need to integrate the connector into your application.
Verify that the jar file exists:
ls -lh /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
Below is a minimal runnable Spark application example:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object SparkWriteToYMatrixApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkWriteToYMatrixApp")
.getOrCreate()
val srcDf = spark.table("dm.orders_daily")
.select(
col("order_id"),
col("user_id"),
col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
col("created_at").cast("timestamp").as("created_at")
)
.repartition(4, col("order_id"))
srcDf.write
.format("its-ymatrix")
.option("url", "jdbc:postgresql://ymatrix-master-host:5432/your_database")
.option("user", "database_user")
.option("password", "yourpassword")
.option("dbschema", "public")
.option("dbtable", "orders_daily")
.option("distributedby", "order_id")
.option("network.timeout", "120s")
.option("server.timeout", "120s")
.mode("append")
.save()
spark.stop()
}
}
If your business job is packaged separately, include the connector using --jars during submission:
export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export APP_JAR=/path/to/your-spark-app.jar
export SPARK_LOCAL_IP=172.16.100.32
spark-submit \
--class SparkWriteToYMatrixApp \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--conf spark.sql.shuffle.partitions=4 \
--jars ${CONNECTOR_JAR} \
${APP_JAR}
Check at least these three points:
Failed to find data source: its-ymatrix.gpfdist errors occur during the .save() phase.This method is ideal for debugging, troubleshooting, demos, and one-off import tasks.
Command-line invocation typically uses either spark-shell or spark-submit.
spark-shellThis is the best approach for verifying the data pipeline for the first time.
Start spark-shell:
export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export YMATRIX_URL=jdbc:postgresql://ymatrix-master-host:5432/your_database
export YMATRIX_USER=database_user
export YMATRIX_PASSWORD=yourpassword
export YMATRIX_SCHEMA=public
export SPARK_LOCAL_IP=172.16.100.32
spark-shell \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--conf spark.sql.shuffle.partitions=4 \
--jars ${CONNECTOR_JAR}
Once inside spark-shell, execute the following:
import spark.implicits._
import org.apache.spark.sql.functions._
val srcDf = Seq(
(1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
(2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00"),
(3L, "u003", BigDecimal("99.99"), "2026-03-23 10:10:00")
).toDF("order_id", "user_id", "amount", "created_at")
.withColumn("created_at", col("created_at").cast("timestamp"))
srcDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_demo")
.option("distributedby", "order_id")
.option("server.port", "43000")
.mode("overwrite")
.save()
Immediately perform a readback verification:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select * from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo order by order_id"
)
.load()
.show(false)
spark-submitIf you prefer not to enter an interactive shell manually, you can write the logic into a program and submit it directly using spark-submit.
The command structure is essentially the same as "Programmatic Invocation." The key points remain:
--jars.A minimal command template is shown below:
spark-submit \
--class SparkWriteToYMatrixApp \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--jars /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar \
/path/to/your-spark-app.jar
We recommend selecting a method based on the following criteria:
spark-shell.spark-submit.spark-shell.spark-submit.This section is the core of the guide. All examples below assume you have already included the connector jar in Spark.
This is the most common formal development approach, suitable when source data already exists as a Spark table or temporary view.
import org.apache.spark.sql.functions.col
val sourceTable = "dm.orders_daily"
val targetTable = "orders_daily"
val sourceDf = spark.table(sourceTable)
.select(
col("order_id"),
col("user_id"),
col("shop_id"),
col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
col("status"),
col("created_at").cast("timestamp").as("created_at")
)
.repartition(4, col("order_id"))
sourceDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", targetTable)
.option("distributedby", "order_id")
.option("network.timeout", "120s")
.option("server.timeout", "120s")
.mode("append")
.save()
Use this approach when:
If the target table fields do not exactly match the source table, it is recommended to perform mapping within Spark before writing.
import org.apache.spark.sql.functions._
val mappedDf = spark.table("ods.orders_raw")
.where(col("dt") === lit("2026-03-23"))
.where(col("is_deleted") === lit(0))
.select(
col("id").as("biz_order_id"),
col("buyer").as("buyer_id"),
col("seller").as("seller_id"),
col("amount").cast("decimal(18,2)").as("pay_amount"),
to_timestamp(col("pay_time")).as("pay_time")
)
mappedDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_mapped")
.option("distributedby", "biz_order_id")
.mode("overwrite")
.save()
Use this approach when:
In development environments, it is often necessary to "clear old data and re-import" without rebuilding the table.
spark.table("dm.orders_demo")
.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_demo")
.option("truncate", "true")
.mode("overwrite")
.save()
Use this approach when:
When importing a large table for the first time, avoid pushing the entire table at once. A safer approach is to progress by interval based on a monotonic field.
import org.apache.spark.sql.functions._
val sourceTable = "dwd.iot_orders"
val lowerBound = 1L
val upperBound = 100000L
val batchDf = spark.table(sourceTable)
.where(col("ingest_id").between(lowerBound, upperBound))
.select(
col("ingest_id"),
col("event_id"),
col("device_id"),
col("tenant_id"),
col("event_time").cast("timestamp"),
col("payload")
)
.repartition(4, col("ingest_id"))
batchDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "iot_orders_batch_demo")
.option("distributedby", "ingest_id")
.option("server.port", "43001")
.mode("append")
.save()
Use this approach when:
If your workflow is SQL-oriented rather than DataFrame API-based, you can use a temporary sink view.
First, create the target table in YMatrix:
create table public.orders_sql_sink (
order_id bigint,
user_id text,
amount decimal(18,2),
created_at timestamp
)
distributed by (order_id);
Then, execute the following in Spark:
import spark.implicits._
import org.apache.spark.sql.functions._
Seq(
(1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
(2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00")
).toDF("order_id", "user_id", "amount", "created_at")
.withColumn("created_at", col("created_at").cast("timestamp"))
.createOrReplaceTempView("spark_orders_src")
spark.sql("DROP VIEW IF EXISTS ymatrix_orders_sink")
spark.sql(
s"""
|CREATE TEMPORARY VIEW ymatrix_orders_sink
|USING com.itsumma.gpconnector.GreenplumDataSource
|OPTIONS (
| url '${sys.env("YMATRIX_URL")}',
| user '${sys.env("YMATRIX_USER")}',
| password '${sys.env("YMATRIX_PASSWORD")}',
| dbschema '${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}',
| dbtable 'orders_sql_sink',
| network.timeout '120s',
| server.timeout '120s',
| dbmessages 'WARN'
|)
|""".stripMargin
)
spark.sql(
"""
|INSERT INTO TABLE ymatrix_orders_sink
|SELECT /*+ REPARTITION(4, order_id) */
| order_id,
| user_id,
| amount,
| created_at
|FROM spark_orders_src
|""".stripMargin
)
Key points for this approach:
INSERT statement.After writing, it is recommended to perform at least a basic verification.
Row Count Verification:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select count(*)::bigint as cnt from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
)
.load()
.show(false)
Primary Key or Watermark Range Verification:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select min(order_id) as min_id, max(order_id) as max_id from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
)
.load()
.show(false)
It is recommended to perform at least these three types of verification:
You must explicitly specify the write mode:
mode("append")mode("overwrite")Do not omit the write mode.
While the connector can automatically create tables in some scenarios if the target table does not exist, it is recommended for production environments to pre-create core tables and manage them explicitly:
If errors relate to gpfdist connections, prioritize checking:
SPARK_LOCAL_IP is an address accessible by YMatrix segments.server.port is required for debugging.repartition primarily affects write concurrency.
General recommendations:
repartition for large-table imports.Before formal import, it is recommended to verify the following field types carefully:
decimaltimestampdateWe recommend progressing in the following order:
spark-shell.This sequence makes it easiest to isolate and troubleshoot issues related to jars, parameters, networks, structures, and performance.