В этой главе описан метод записи данных из Spark в YMatrix с использованием spark-ymatrix-connector. Руководство предназначено для разработчиков Spark, инженеров платформ обработки данных и специалистов по интеграции данных для эффективного использования базы данных YMatrix.
| Программное обеспечение | Версия |
|---|---|
| Spark | 3.4.1 |
| Scala | 2.12.17 |
| Java | 1.8 |
| Connector | 3.1.0 |
spark-ymatrix-connector — это коннектор Spark DataSource V2, предназначенный для записи данных из Spark в YMatrix.
Вам нужно запомнить только одну точку входа:
.format("its-ymatrix")
Любые данные, существующие в Spark и представимые как DataFrame, таблица Spark или временный view, могут быть записаны в YMatrix через этот коннектор.
Типичные источники данных включают:
parquet, orc, csv или jsonДокумент ориентирован на практическую реализацию, а не на абстрактные концепции. Он обучает двум ключевым навыкам:
jar-пакет в Spark.Коннектор подходит для следующих сценариев:
| Сценарий | Поддержка | Описание |
|---|---|---|
| Spark DataFrame → YMatrix | Поддерживается | Наиболее распространённый метод офлайн-импорта. |
| Таблица Spark → одна таблица YMatrix | Поддерживается | Подходит для импорта из таблиц Spark SQL или временных view. |
| Фильтрация, отображение полей и переименование перед записью | Поддерживается | Преобразования выполняются в Spark до записи. |
| Пакетный импорт больших таблиц по диапазону | Поддерживается | Последовательный импорт на основе бизнес-ключей или водяных меток. |
| Запись через Spark SQL | Поддерживается | Реализуется через временный sink-view + INSERT INTO. |
| Проверка после записи | Поддерживается | Результаты можно сразу прочитать из YMatrix с помощью коннектора. |
Коннектор отвечает только за передачу данных из Spark в YMatrix. Он не управляет следующими бизнес-семантиками:
Эти функции должны обрабатываться логикой вашего Spark-задания или внешними workflow-системами планирования.
Сначала рассмотрим минимальную рабочую модель:
spark.table("db.source_table")
.write
.format("its-ymatrix")
.option("url", "jdbc:postgresql://host:5432/db")
.option("user", "database_user")
.option("password", "yourpassword")
.option("dbschema", "public")
.option("dbtable", "target_table")
.mode("append")
.save()
Этот код выполняет следующие шаги:
its-ymatrix..save().Наиболее часто используемые параметры перечислены ниже:
| Параметр | Обязательный | Описание |
|---|---|---|
url |
Да | JDBC-URL YMatrix. |
user |
Да | Имя пользователя. |
password |
Да | Пароль. |
dbschema |
Рекомендуется | Целевая схема. |
dbtable |
Да | Имя целевой таблицы. |
mode |
Да | Необходимо явно указать append или overwrite. |
distributedby |
Рекомендуется (при авто-создании) | Укажите ключ распределения при автоматическом создании таблицы. |
network.timeout |
Нет | Рекомендуется для сетевых сред с высокой чувствительностью. |
server.timeout |
Нет | Рекомендуется для передачи больших объёмов данных. |
server.port |
Нет | Полезен для фиксации портов при сетевой отладке. |
truncate |
Нет | Используется с overwrite; помогает сохранить структуру таблицы. |
Этот раздел посвящён одной теме: как правильно подключить jar-файл коннектора в Spark.
Путь к собранному jar в текущем репозитории:
/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
Ниже описаны два метода.
Этот метод подходит, если у вас уже есть формальный код Spark-задания и необходимо интегрировать коннектор в приложение.
Убедитесь, что файл существует:
ls -lh /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
Ниже приведён минимальный исполняемый пример приложения Spark:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object SparkWriteToYMatrixApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkWriteToYMatrixApp")
.getOrCreate()
val srcDf = spark.table("dm.orders_daily")
.select(
col("order_id"),
col("user_id"),
col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
col("created_at").cast("timestamp").as("created_at")
)
.repartition(4, col("order_id"))
srcDf.write
.format("its-ymatrix")
.option("url", "jdbc:postgresql://ymatrix-master-host:5432/your_database")
.option("user", "database_user")
.option("password", "yourpassword")
.option("dbschema", "public")
.option("dbtable", "orders_daily")
.option("distributedby", "order_id")
.option("network.timeout", "120s")
.option("server.timeout", "120s")
.mode("append")
.save()
spark.stop()
}
}
Если ваше бизнес-задание упаковано отдельно, подключите коннектор с помощью --jars при отправке:
export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export APP_JAR=/path/to/your-spark-app.jar
export SPARK_LOCAL_IP=172.16.100.32
spark-submit \
--class SparkWriteToYMatrixApp \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--conf spark.sql.shuffle.partitions=4 \
--jars ${CONNECTOR_JAR} \
${APP_JAR}
Проверьте как минимум три момента:
Failed to find data source: its-ymatrix..save() не возникает ошибок подключения или gpfdist.Этот метод идеален для отладки, диагностики, демонстраций и разовых задач импорта.
Вызов из командной строки обычно использует либо spark-shell, либо spark-submit.
spark-shellЭто лучший подход для первоначальной проверки конвейера данных.
Запустите spark-shell:
export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export YMATRIX_URL=jdbc:postgresql://ymatrix-master-host:5432/your_database
export YMATRIX_USER=database_user
export YMATRIX_PASSWORD=yourpassword
export YMATRIX_SCHEMA=public
export SPARK_LOCAL_IP=172.16.100.32
spark-shell \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--conf spark.sql.shuffle.partitions=4 \
--jars ${CONNECTOR_JAR}
После входа в spark-shell выполните следующее:
import spark.implicits._
import org.apache.spark.sql.functions._
val srcDf = Seq(
(1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
(2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00"),
(3L, "u003", BigDecimal("99.99"), "2026-03-23 10:10:00")
).toDF("order_id", "user_id", "amount", "created_at")
.withColumn("created_at", col("created_at").cast("timestamp"))
srcDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_demo")
.option("distributedby", "order_id")
.option("server.port", "43000")
.mode("overwrite")
.save()
Сразу выполните проверку чтения:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select * from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo order by order_id"
)
.load()
.show(false)
spark-submitЕсли вы предпочитаете не использовать интерактивную оболочку, можно записать логику в программу и отправить её напрямую через spark-submit.
Структура команды практически совпадает с «Программным вызовом». Ключевые моменты остаются прежними:
--jars.Ниже приведён минимальный шаблон команды:
spark-submit \
--class SparkWriteToYMatrixApp \
--master local[4] \
--driver-memory 8g \
--conf spark.driver.host=${SPARK_LOCAL_IP} \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.local.ip=${SPARK_LOCAL_IP} \
--jars /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar \
/path/to/your-spark-app.jar
Рекомендуем выбирать метод по следующим критериям:
spark-shell.spark-submit.spark-shell.spark-submit.Этот раздел является основным в руководстве. Все приведённые ниже примеры предполагают, что вы уже подключили jar коннектора в Spark.
Это наиболее распространённый формальный подход, подходящий, когда исходные данные уже существуют как таблица Spark или временный view.
import org.apache.spark.sql.functions.col
val sourceTable = "dm.orders_daily"
val targetTable = "orders_daily"
val sourceDf = spark.table(sourceTable)
.select(
col("order_id"),
col("user_id"),
col("shop_id"),
col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
col("status"),
col("created_at").cast("timestamp").as("created_at")
)
.repartition(4, col("order_id"))
sourceDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", targetTable)
.option("distributedby", "order_id")
.option("network.timeout", "120s")
.option("server.timeout", "120s")
.mode("append")
.save()
Используйте этот подход, когда:
Если поля целевой таблицы не совпадают точно с исходной таблицей, рекомендуется выполнить отображение в Spark до записи.
import org.apache.spark.sql.functions._
val mappedDf = spark.table("ods.orders_raw")
.where(col("dt") === lit("2026-03-23"))
.where(col("is_deleted") === lit(0))
.select(
col("id").as("biz_order_id"),
col("buyer").as("buyer_id"),
col("seller").as("seller_id"),
col("amount").cast("decimal(18,2)").as("pay_amount"),
to_timestamp(col("pay_time")).as("pay_time")
)
mappedDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_mapped")
.option("distributedby", "biz_order_id")
.mode("overwrite")
.save()
Используйте этот подход, когда:
В средах разработки часто требуется «очистить старые данные и повторно импортировать», не пересоздавая таблицу.
spark.table("dm.orders_demo")
.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "orders_demo")
.option("truncate", "true")
.mode("overwrite")
.save()
Используйте этот подход, когда:
При первоначальном импорте большой таблицы избегайте одновременной передачи всей таблицы. Более безопасный подход — поэтапный импорт по интервалам на основе монотонного поля.
import org.apache.spark.sql.functions._
val sourceTable = "dwd.iot_orders"
val lowerBound = 1L
val upperBound = 100000L
val batchDf = spark.table(sourceTable)
.where(col("ingest_id").between(lowerBound, upperBound))
.select(
col("ingest_id"),
col("event_id"),
col("device_id"),
col("tenant_id"),
col("event_time").cast("timestamp"),
col("payload")
)
.repartition(4, col("ingest_id"))
batchDf.write
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
.option("dbtable", "iot_orders_batch_demo")
.option("distributedby", "ingest_id")
.option("server.port", "43001")
.mode("append")
.save()
Используйте этот подход, когда:
Если ваш workflow ориентирован на SQL, а не на DataFrame API, можно использовать временный sink-view.
Сначала создайте целевую таблицу в YMatrix:
create table public.orders_sql_sink (
order_id bigint,
user_id text,
amount decimal(18,2),
created_at timestamp
)
distributed by (order_id);
Затем выполните следующее в Spark:
import spark.implicits._
import org.apache.spark.sql.functions._
Seq(
(1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
(2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00")
).toDF("order_id", "user_id", "amount", "created_at")
.withColumn("created_at", col("created_at").cast("timestamp"))
.createOrReplaceTempView("spark_orders_src")
spark.sql("DROP VIEW IF EXISTS ymatrix_orders_sink")
spark.sql(
s"""
|CREATE TEMPORARY VIEW ymatrix_orders_sink
|USING com.itsumma.gpconnector.GreenplumDataSource
|OPTIONS (
| url '${sys.env("YMATRIX_URL")}',
| user '${sys.env("YMATRIX_USER")}',
| password '${sys.env("YMATRIX_PASSWORD")}',
| dbschema '${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}',
| dbtable 'orders_sql_sink',
| network.timeout '120s',
| server.timeout '120s',
| dbmessages 'WARN'
|)
|""".stripMargin
)
spark.sql(
"""
|INSERT INTO TABLE ymatrix_orders_sink
|SELECT /*+ REPARTITION(4, order_id) */
| order_id,
| user_id,
| amount,
| created_at
|FROM spark_orders_src
|""".stripMargin
)
Ключевые моменты этого подхода:
INSERT должны быть перечислены явно.После записи рекомендуется выполнить хотя бы базовую проверку.
Проверка количества строк:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select count(*)::bigint as cnt from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
)
.load()
.show(false)
Проверка диапазона первичного ключа или водяной метки:
spark.read
.format("its-ymatrix")
.option("url", sys.env("YMATRIX_URL"))
.option("user", sys.env("YMATRIX_USER"))
.option("password", sys.env("YMATRIX_PASSWORD"))
.option(
"dbtable",
s"select min(order_id) as min_id, max(order_id) as max_id from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
)
.load()
.show(false)
Рекомендуется выполнять как минимум три типа проверок:
Вы должны явно указать режим записи:
mode("append")mode("overwrite")Не пропускайте указание режима записи.
Хотя коннектор может автоматически создавать таблицы в некоторых сценариях, если целевая таблица не существует, для производственных сред рекомендуется заранее создавать основные таблицы и управлять ими явно:
Если возникают ошибки, связанные с подключениями gpfdist, в первую очередь проверьте:
SPARK_LOCAL_IP адресом, доступным для segments YMatrix.server.port для отладки.repartition в основном влияет на параллелизм записи.
Общие рекомендации:
repartition при импорте больших таблиц.Перед формальным импортом рекомендуется тщательно проверить следующие типы полей:
decimaltimestampdateРекомендуем следовать в следующем порядке:
spark-shell.Такая последовательность максимально упрощает выявление и устранение проблем, связанных с jar-файлами, параметрами, сетью, структурой и производительностью.