Writing Data from Spark to YMatrix

This chapter introduces the method for writing data from Spark to YMatrix using spark-ymatrix-connector. This guide is intended for Spark developers, data platform engineers, and data integration specialists to effectively utilize the YMatrix database.

Software Information

Software Version Information
Spark 3.4.1
Scala 2.12.17
Java 1.8
Connector 3.1.0

1. Overview

The spark-ymatrix-connector is a Spark DataSource V2 Connector designed to write data from Spark into YMatrix.

There is only one entry point you need to remember:

.format("its-ymatrix")

As long as your data exists within Spark and can be represented as a DataFrame, Spark table, or temporary view, it can be written to YMatrix via this connector.

Typical data sources include:

  • Spark SQL tables
  • File data in formats such as parquet, orc, csv, or json
  • Data imported into Spark via JDBC
  • Upstream table formats like Hive, Iceberg, or Delta
  • Test data constructed directly in code

This document focuses on practical implementation rather than abstract concepts. It teaches two key skills:

  1. How to integrate the jar package into Spark.
  2. How to actually write data into YMatrix.

2. Scope and Boundaries

2.1 Applicable Scenarios

This connector is suitable for the following scenarios:

Scenario Support Status Description
Spark DataFrame to YMatrix Supported The most common offline import method.
Spark Table to Single YMatrix Table Supported Suitable for importing from Spark SQL tables or temporary views.
Filtering, Field Mapping, and Renaming Before Write Supported Perform transformations within Spark before writing.
Batch Import of Large Tables by Range Supported Progressively import based on business primary keys or watermarks.
Write via Spark SQL Supported Achieved through a temporary sink view + INSERT INTO.
Post-Write Readback Verification Supported Directly read back results from YMatrix using the connector.

2.2 Capability Boundaries

This connector handles data transmission from Spark to YMatrix. It does not manage the following business semantics:

  • Automatic deduplication
  • Automatic checkpoint resumption
  • Automatic incremental identification
  • Automatic schema evolution governance
  • Automatic data backfill orchestration

These capabilities must be handled by your Spark job logic or external scheduling workflows.

3. Minimum Working Code Example

First, examine the minimal working model:

spark.table("db.source_table")
  .write
  .format("its-ymatrix")
  .option("url", "jdbc:postgresql://host:5432/db")
  .option("user", "database_user")
  .option("password", "yourpassword")
  .option("dbschema", "public")
  .option("dbtable", "target_table")
  .mode("append")
  .save()

This code performs the following steps:

  1. Retrieves a dataset from Spark.
  2. Specifies the write target as its-ymatrix.
  3. Provides the connector with the target database address, credentials, schema, and table name.
  4. Defines the write mode.
  5. Executes .save().

The most commonly used parameters are listed below:

Parameter Required Description
url Yes YMatrix JDBC URL.
user Yes Username.
password Yes Password.
dbschema Recommended Target schema.
dbtable Yes Target table name.
mode Yes Must explicitly specify append or overwrite.
distributedby Recommended (Auto-create) Specify the distribution key when auto-creating tables.
network.timeout No Recommended for network-sensitive environments.
server.timeout No Recommended for large-volume transfers.
server.port No Useful for fixing ports during network debugging.
truncate No Use with overwrite; helps preserve table structure.

4. Importing and Using the JAR Package

This section covers a single topic: how to correctly include the connector's jar file in Spark.

The built jar path in the current repository is:

/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar

Two methods are described below.

4.1 Method 1: Programmatic Invocation

This method is suitable if you already have formal Spark job code and need to integrate the connector into your application.

Step 1: Prepare the JAR File

Verify that the jar file exists:

ls -lh /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar

Step 2: Write the Spark Application

Below is a minimal runnable Spark application example:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

object SparkWriteToYMatrixApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("SparkWriteToYMatrixApp")
      .getOrCreate()

    val srcDf = spark.table("dm.orders_daily")
      .select(
        col("order_id"),
        col("user_id"),
        col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
        col("created_at").cast("timestamp").as("created_at")
      )
      .repartition(4, col("order_id"))

    srcDf.write
      .format("its-ymatrix")
      .option("url", "jdbc:postgresql://ymatrix-master-host:5432/your_database")
      .option("user", "database_user")
      .option("password", "yourpassword")
      .option("dbschema", "public")
      .option("dbtable", "orders_daily")
      .option("distributedby", "order_id")
      .option("network.timeout", "120s")
      .option("server.timeout", "120s")
      .mode("append")
      .save()

    spark.stop()
  }
}

Step 3: Include the JAR During Submission

If your business job is packaged separately, include the connector using --jars during submission:

export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export APP_JAR=/path/to/your-spark-app.jar
export SPARK_LOCAL_IP=172.16.100.32

spark-submit \
  --class SparkWriteToYMatrixApp \
  --master local[4] \
  --driver-memory 8g \
  --conf spark.driver.host=${SPARK_LOCAL_IP} \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.local.ip=${SPARK_LOCAL_IP} \
  --conf spark.sql.shuffle.partitions=4 \
  --jars ${CONNECTOR_JAR} \
  ${APP_JAR}

Step 4: Verify Successful Integration

Check at least these three points:

  1. The program does not report Failed to find data source: its-ymatrix.
  2. No connection or gpfdist errors occur during the .save() phase.
  3. Data is visible in YMatrix after the write completes.

4.2 Method 2: Command-Line Invocation

This method is ideal for debugging, troubleshooting, demos, and one-off import tasks.

Command-line invocation typically uses either spark-shell or spark-submit.

4.2.1 Direct Write via spark-shell

This is the best approach for verifying the data pipeline for the first time.

Start spark-shell:

export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export YMATRIX_URL=jdbc:postgresql://ymatrix-master-host:5432/your_database
export YMATRIX_USER=database_user
export YMATRIX_PASSWORD=yourpassword
export YMATRIX_SCHEMA=public
export SPARK_LOCAL_IP=172.16.100.32

spark-shell \
  --master local[4] \
  --driver-memory 8g \
  --conf spark.driver.host=${SPARK_LOCAL_IP} \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.local.ip=${SPARK_LOCAL_IP} \
  --conf spark.sql.shuffle.partitions=4 \
  --jars ${CONNECTOR_JAR}

Once inside spark-shell, execute the following:

import spark.implicits._
import org.apache.spark.sql.functions._

val srcDf = Seq(
  (1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
  (2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00"),
  (3L, "u003", BigDecimal("99.99"), "2026-03-23 10:10:00")
).toDF("order_id", "user_id", "amount", "created_at")
  .withColumn("created_at", col("created_at").cast("timestamp"))

srcDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "orders_demo")
  .option("distributedby", "order_id")
  .option("server.port", "43000")
  .mode("overwrite")
  .save()

Immediately perform a readback verification:

spark.read
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option(
    "dbtable",
    s"select * from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo order by order_id"
  )
  .load()
  .show(false)

4.2.2 Run Scripts or Programs via spark-submit

If you prefer not to enter an interactive shell manually, you can write the logic into a program and submit it directly using spark-submit.

The command structure is essentially the same as "Programmatic Invocation." The key points remain:

  • Your business logic resides in your application package.
  • The connector is included as an external dependency via --jars.

A minimal command template is shown below:

spark-submit \
  --class SparkWriteToYMatrixApp \
  --master local[4] \
  --driver-memory 8g \
  --conf spark.driver.host=${SPARK_LOCAL_IP} \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.local.ip=${SPARK_LOCAL_IP} \
  --jars /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar \
  /path/to/your-spark-app.jar

4.3 Choosing Between Methods

We recommend selecting a method based on the following criteria:

  • First-time pipeline validation: Use spark-shell.
  • Existing formal job code: Use programmatic invocation + spark-submit.
  • Debugging network or parameter issues: Prioritize spark-shell.
  • Production batch jobs: Use spark-submit.

5. Development Examples

This section is the core of the guide. All examples below assume you have already included the connector jar in Spark.

5.1 Example 1: Writing from a Spark Table to YMatrix

This is the most common formal development approach, suitable when source data already exists as a Spark table or temporary view.

import org.apache.spark.sql.functions.col

val sourceTable = "dm.orders_daily"
val targetTable = "orders_daily"

val sourceDf = spark.table(sourceTable)
  .select(
    col("order_id"),
    col("user_id"),
    col("shop_id"),
    col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
    col("status"),
    col("created_at").cast("timestamp").as("created_at")
  )
  .repartition(4, col("order_id"))

sourceDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", targetTable)
  .option("distributedby", "order_id")
  .option("network.timeout", "120s")
  .option("server.timeout", "120s")
  .mode("append")
  .save()

Use this approach when:

  • You have a standard Spark SQL table.
  • You want to explicitly define the field list.
  • You need to control partition counts before writing.

5.2 Example 2: Filtering and Field Mapping Before Write

If the target table fields do not exactly match the source table, it is recommended to perform mapping within Spark before writing.

import org.apache.spark.sql.functions._

val mappedDf = spark.table("ods.orders_raw")
  .where(col("dt") === lit("2026-03-23"))
  .where(col("is_deleted") === lit(0))
  .select(
    col("id").as("biz_order_id"),
    col("buyer").as("buyer_id"),
    col("seller").as("seller_id"),
    col("amount").cast("decimal(18,2)").as("pay_amount"),
    to_timestamp(col("pay_time")).as("pay_time")
  )

mappedDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "orders_mapped")
  .option("distributedby", "biz_order_id")
  .mode("overwrite")
  .save()

Use this approach when:

  • You need to filter out dirty data.
  • You need to rename fields.
  • You need to explicitly cast critical types such as amounts or timestamps.

5.3 Example 3: Overwriting Data While Preserving Table Structure

In development environments, it is often necessary to "clear old data and re-import" without rebuilding the table.

spark.table("dm.orders_demo")
  .write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "orders_demo")
  .option("truncate", "true")
  .mode("overwrite")
  .save()

Use this approach when:

  • Conducting development debugging.
  • Re-running tests in a test environment repeatedly.
  • You want to retain the table structure but re-import data.

5.4 Example 4: Batch Importing Large Tables by Range

When importing a large table for the first time, avoid pushing the entire table at once. A safer approach is to progress by interval based on a monotonic field.

import org.apache.spark.sql.functions._

val sourceTable = "dwd.iot_orders"
val lowerBound = 1L
val upperBound = 100000L

val batchDf = spark.table(sourceTable)
  .where(col("ingest_id").between(lowerBound, upperBound))
  .select(
    col("ingest_id"),
    col("event_id"),
    col("device_id"),
    col("tenant_id"),
    col("event_time").cast("timestamp"),
    col("payload")
  )
  .repartition(4, col("ingest_id"))

batchDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "iot_orders_batch_demo")
  .option("distributedby", "ingest_id")
  .option("server.port", "43001")
  .mode("append")
  .save()

Use this approach when:

  • Performing initial migration of a large table.
  • Executing a gray-scale import.
  • Backfilling data in batches.
  • Manually controlling the volume of each batch.

5.5 Example 5: Writing via Spark SQL

If your workflow is SQL-oriented rather than DataFrame API-based, you can use a temporary sink view.

First, create the target table in YMatrix:

create table public.orders_sql_sink (
  order_id bigint,
  user_id text,
  amount decimal(18,2),
  created_at timestamp
)
distributed by (order_id);

Then, execute the following in Spark:

import spark.implicits._
import org.apache.spark.sql.functions._

Seq(
  (1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
  (2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00")
).toDF("order_id", "user_id", "amount", "created_at")
  .withColumn("created_at", col("created_at").cast("timestamp"))
  .createOrReplaceTempView("spark_orders_src")

spark.sql("DROP VIEW IF EXISTS ymatrix_orders_sink")

spark.sql(
  s"""
     |CREATE TEMPORARY VIEW ymatrix_orders_sink
     |USING com.itsumma.gpconnector.GreenplumDataSource
     |OPTIONS (
     |  url '${sys.env("YMATRIX_URL")}',
     |  user '${sys.env("YMATRIX_USER")}',
     |  password '${sys.env("YMATRIX_PASSWORD")}',
     |  dbschema '${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}',
     |  dbtable 'orders_sql_sink',
     |  network.timeout '120s',
     |  server.timeout '120s',
     |  dbmessages 'WARN'
     |)
     |""".stripMargin
)

spark.sql(
  """
    |INSERT INTO TABLE ymatrix_orders_sink
    |SELECT /*+ REPARTITION(4, order_id) */
    |  order_id,
    |  user_id,
    |  amount,
    |  created_at
    |FROM spark_orders_src
    |""".stripMargin
)

Key points for this approach:

  • Best suited for SQL-style jobs.
  • The target table must exist beforehand.
  • Explicitly list fields in the INSERT statement.

5.6 Example 6: Post-Write Verification

After writing, it is recommended to perform at least a basic verification.

Row Count Verification:

spark.read
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option(
    "dbtable",
    s"select count(*)::bigint as cnt from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
  )
  .load()
  .show(false)

Primary Key or Watermark Range Verification:

spark.read
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option(
    "dbtable",
    s"select min(order_id) as min_id, max(order_id) as max_id from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
  )
  .load()
  .show(false)

It is recommended to perform at least these three types of verification:

  1. Row count verification.
  2. Primary key or watermark range verification.
  3. Sample data verification.

6. Common Issues and Precautions

6.1 Explicit Mode Specification Required

You must explicitly specify the write mode:

  • mode("append")
  • mode("overwrite")

Do not omit the write mode.

6.2 Automatic Table Creation

While the connector can automatically create tables in some scenarios if the target table does not exist, it is recommended for production environments to pre-create core tables and manage them explicitly:

  • Column data types
  • Distribution strategies
  • Storage attributes

6.3 Debugging Network Issues

If errors relate to gpfdist connections, prioritize checking:

  • Whether SPARK_LOCAL_IP is an address accessible by YMatrix segments.
  • Whether ports on the hosts running the Spark Driver or Executors are open.
  • Whether a fixed server.port is required for debugging.

6.4 Understanding Partition Counts

repartition primarily affects write concurrency.

General recommendations:

  • Use fewer partitions for small-table debugging.
  • Include the distribution key in repartition for large-table imports.
  • Ensure the partition count does not significantly exceed the number of YMatrix primary segments.

6.5 Critical Types to Verify

Before formal import, it is recommended to verify the following field types carefully:

  • decimal
  • timestamp
  • date
  • JSON, geometry, and other extended types
  • Very wide string fields

6.6 Recommended Onboarding Sequence

We recommend progressing in the following order:

  1. Run the minimal 3-row sample using spark-shell.
  2. Perform a single-table import using the actual source table.
  3. Verify field mapping and type compatibility.
  4. Execute large-table batch imports or bulk imports.

This sequence makes it easiest to isolate and troubleshoot issues related to jars, parameters, networks, structures, and performance.