Запись данных из Spark в YMatrix

В этой главе описан метод записи данных из Spark в YMatrix с использованием spark-ymatrix-connector. Руководство предназначено для разработчиков Spark, инженеров платформ обработки данных и специалистов по интеграции данных для эффективного использования базы данных YMatrix.

Информация о программном обеспечении

Программное обеспечение Версия
Spark 3.4.1
Scala 2.12.17
Java 1.8
Connector 3.1.0

1. Обзор

spark-ymatrix-connector — это коннектор Spark DataSource V2, предназначенный для записи данных из Spark в YMatrix.

Вам нужно запомнить только одну точку входа:

.format("its-ymatrix")

Любые данные, существующие в Spark и представимые как DataFrame, таблица Spark или временный view, могут быть записаны в YMatrix через этот коннектор.

Типичные источники данных включают:

  • Таблицы Spark SQL
  • Файловые данные в форматах parquet, orc, csv или json
  • Данные, импортированные в Spark через JDBC
  • Таблицы верхнего уровня, такие как Hive, Iceberg или Delta
  • Тестовые данные, созданные непосредственно в коде

Документ ориентирован на практическую реализацию, а не на абстрактные концепции. Он обучает двум ключевым навыкам:

  1. Как интегрировать jar-пакет в Spark.
  2. Как фактически записывать данные в YMatrix.

2. Область применения и ограничения

2.1 Поддерживаемые сценарии

Коннектор подходит для следующих сценариев:

Сценарий Поддержка Описание
Spark DataFrame → YMatrix Поддерживается Наиболее распространённый метод офлайн-импорта.
Таблица Spark → одна таблица YMatrix Поддерживается Подходит для импорта из таблиц Spark SQL или временных view.
Фильтрация, отображение полей и переименование перед записью Поддерживается Преобразования выполняются в Spark до записи.
Пакетный импорт больших таблиц по диапазону Поддерживается Последовательный импорт на основе бизнес-ключей или водяных меток.
Запись через Spark SQL Поддерживается Реализуется через временный sink-view + INSERT INTO.
Проверка после записи Поддерживается Результаты можно сразу прочитать из YMatrix с помощью коннектора.

2.2 Границы возможностей

Коннектор отвечает только за передачу данных из Spark в YMatrix. Он не управляет следующими бизнес-семантиками:

  • Автоматическое удаление дубликатов
  • Автоматическое восстановление с контрольной точки
  • Автоматическое определение инкрементальных изменений
  • Автоматическое управление эволюцией схемы
  • Автоматическая оркестрация дозагрузки данных

Эти функции должны обрабатываться логикой вашего Spark-задания или внешними workflow-системами планирования.

3. Минимальный рабочий пример кода

Сначала рассмотрим минимальную рабочую модель:

spark.table("db.source_table")
  .write
  .format("its-ymatrix")
  .option("url", "jdbc:postgresql://host:5432/db")
  .option("user", "database_user")
  .option("password", "yourpassword")
  .option("dbschema", "public")
  .option("dbtable", "target_table")
  .mode("append")
  .save()

Этот код выполняет следующие шаги:

  1. Получает набор данных из Spark.
  2. Указывает цель записи как its-ymatrix.
  3. Передаёт коннектору адрес целевой базы данных, учётные данные, схему и имя таблицы.
  4. Определяет режим записи.
  5. Выполняет .save().

Наиболее часто используемые параметры перечислены ниже:

Параметр Обязательный Описание
url Да JDBC-URL YMatrix.
user Да Имя пользователя.
password Да Пароль.
dbschema Рекомендуется Целевая схема.
dbtable Да Имя целевой таблицы.
mode Да Необходимо явно указать append или overwrite.
distributedby Рекомендуется (при авто-создании) Укажите ключ распределения при автоматическом создании таблицы.
network.timeout Нет Рекомендуется для сетевых сред с высокой чувствительностью.
server.timeout Нет Рекомендуется для передачи больших объёмов данных.
server.port Нет Полезен для фиксации портов при сетевой отладке.
truncate Нет Используется с overwrite; помогает сохранить структуру таблицы.

4. Подключение и использование JAR-пакета

Этот раздел посвящён одной теме: как правильно подключить jar-файл коннектора в Spark.

Путь к собранному jar в текущем репозитории:

/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar

Ниже описаны два метода.

4.1 Метод 1: Программный вызов

Этот метод подходит, если у вас уже есть формальный код Spark-задания и необходимо интегрировать коннектор в приложение.

Шаг 1: Подготовка JAR-файла

Убедитесь, что файл существует:

ls -lh /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar

Шаг 2: Написание приложения Spark

Ниже приведён минимальный исполняемый пример приложения Spark:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

object SparkWriteToYMatrixApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("SparkWriteToYMatrixApp")
      .getOrCreate()

    val srcDf = spark.table("dm.orders_daily")
      .select(
        col("order_id"),
        col("user_id"),
        col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
        col("created_at").cast("timestamp").as("created_at")
      )
      .repartition(4, col("order_id"))

    srcDf.write
      .format("its-ymatrix")
      .option("url", "jdbc:postgresql://ymatrix-master-host:5432/your_database")
      .option("user", "database_user")
      .option("password", "yourpassword")
      .option("dbschema", "public")
      .option("dbtable", "orders_daily")
      .option("distributedby", "order_id")
      .option("network.timeout", "120s")
      .option("server.timeout", "120s")
      .mode("append")
      .save()

    spark.stop()
  }
}

Шаг 3: Подключение JAR при отправке задания

Если ваше бизнес-задание упаковано отдельно, подключите коннектор с помощью --jars при отправке:

export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export APP_JAR=/path/to/your-spark-app.jar
export SPARK_LOCAL_IP=172.16.100.32

spark-submit \
  --class SparkWriteToYMatrixApp \
  --master local[4] \
  --driver-memory 8g \
  --conf spark.driver.host=${SPARK_LOCAL_IP} \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.local.ip=${SPARK_LOCAL_IP} \
  --conf spark.sql.shuffle.partitions=4 \
  --jars ${CONNECTOR_JAR} \
  ${APP_JAR}

Шаг 4: Проверка успешной интеграции

Проверьте как минимум три момента:

  1. Программа не выдаёт ошибку Failed to find data source: its-ymatrix.
  2. Во время выполнения .save() не возникает ошибок подключения или gpfdist.
  3. После завершения записи данные видны в YMatrix.

4.2 Метод 2: Вызов из командной строки

Этот метод идеален для отладки, диагностики, демонстраций и разовых задач импорта.

Вызов из командной строки обычно использует либо spark-shell, либо spark-submit.

4.2.1 Прямая запись через spark-shell

Это лучший подход для первоначальной проверки конвейера данных.

Запустите spark-shell:

export CONNECTOR_JAR=/root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar
export YMATRIX_URL=jdbc:postgresql://ymatrix-master-host:5432/your_database
export YMATRIX_USER=database_user
export YMATRIX_PASSWORD=yourpassword
export YMATRIX_SCHEMA=public
export SPARK_LOCAL_IP=172.16.100.32

spark-shell \
  --master local[4] \
  --driver-memory 8g \
  --conf spark.driver.host=${SPARK_LOCAL_IP} \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.local.ip=${SPARK_LOCAL_IP} \
  --conf spark.sql.shuffle.partitions=4 \
  --jars ${CONNECTOR_JAR}

После входа в spark-shell выполните следующее:

import spark.implicits._
import org.apache.spark.sql.functions._

val srcDf = Seq(
  (1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
  (2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00"),
  (3L, "u003", BigDecimal("99.99"), "2026-03-23 10:10:00")
).toDF("order_id", "user_id", "amount", "created_at")
  .withColumn("created_at", col("created_at").cast("timestamp"))

srcDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "orders_demo")
  .option("distributedby", "order_id")
  .option("server.port", "43000")
  .mode("overwrite")
  .save()

Сразу выполните проверку чтения:

spark.read
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option(
    "dbtable",
    s"select * from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo order by order_id"
  )
  .load()
  .show(false)

4.2.2 Запуск скриптов или программ через spark-submit

Если вы предпочитаете не использовать интерактивную оболочку, можно записать логику в программу и отправить её напрямую через spark-submit.

Структура команды практически совпадает с «Программным вызовом». Ключевые моменты остаются прежними:

  • Бизнес-логика находится в вашем приложении.
  • Коннектор подключается как внешняя зависимость через --jars.

Ниже приведён минимальный шаблон команды:

spark-submit \
  --class SparkWriteToYMatrixApp \
  --master local[4] \
  --driver-memory 8g \
  --conf spark.driver.host=${SPARK_LOCAL_IP} \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.local.ip=${SPARK_LOCAL_IP} \
  --jars /root/spark-greenplum-connector/spark-greenplum-connector/target/spark-ymatrix-connector_2.12-3.1.jar \
  /path/to/your-spark-app.jar

4.3 Выбор метода

Рекомендуем выбирать метод по следующим критериям:

  • Первоначальная проверка конвейера: используйте spark-shell.
  • Существующий формальный код задания: используйте программный вызов + spark-submit.
  • Отладка сетевых или параметрических проблем: предпочтительно spark-shell.
  • Производственные пакетные задания: используйте spark-submit.

5. Примеры разработки

Этот раздел является основным в руководстве. Все приведённые ниже примеры предполагают, что вы уже подключили jar коннектора в Spark.

5.1 Пример 1: Запись из таблицы Spark в YMatrix

Это наиболее распространённый формальный подход, подходящий, когда исходные данные уже существуют как таблица Spark или временный view.

import org.apache.spark.sql.functions.col

val sourceTable = "dm.orders_daily"
val targetTable = "orders_daily"

val sourceDf = spark.table(sourceTable)
  .select(
    col("order_id"),
    col("user_id"),
    col("shop_id"),
    col("pay_amount").cast("decimal(18,2)").as("pay_amount"),
    col("status"),
    col("created_at").cast("timestamp").as("created_at")
  )
  .repartition(4, col("order_id"))

sourceDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", targetTable)
  .option("distributedby", "order_id")
  .option("network.timeout", "120s")
  .option("server.timeout", "120s")
  .mode("append")
  .save()

Используйте этот подход, когда:

  • У вас есть стандартная таблица Spark SQL.
  • Вы хотите явно определить список полей.
  • Вам нужно контролировать количество партиций перед записью.

5.2 Пример 2: Фильтрация и отображение полей перед записью

Если поля целевой таблицы не совпадают точно с исходной таблицей, рекомендуется выполнить отображение в Spark до записи.

import org.apache.spark.sql.functions._

val mappedDf = spark.table("ods.orders_raw")
  .where(col("dt") === lit("2026-03-23"))
  .where(col("is_deleted") === lit(0))
  .select(
    col("id").as("biz_order_id"),
    col("buyer").as("buyer_id"),
    col("seller").as("seller_id"),
    col("amount").cast("decimal(18,2)").as("pay_amount"),
    to_timestamp(col("pay_time")).as("pay_time")
  )

mappedDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "orders_mapped")
  .option("distributedby", "biz_order_id")
  .mode("overwrite")
  .save()

Используйте этот подход, когда:

  • Нужно отфильтровать некорректные данные.
  • Требуется переименовать поля.
  • Необходимо явно привести критические типы, такие как суммы или метки времени.

5.3 Пример 3: Перезапись данных с сохранением структуры таблицы

В средах разработки часто требуется «очистить старые данные и повторно импортировать», не пересоздавая таблицу.

spark.table("dm.orders_demo")
  .write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "orders_demo")
  .option("truncate", "true")
  .mode("overwrite")
  .save()

Используйте этот подход, когда:

  • Выполняете отладку в разработке.
  • Многократно перезапускаете тесты в тестовой среде.
  • Хотите сохранить структуру таблицы, но повторно загрузить данные.

5.4 Пример 4: Пакетный импорт больших таблиц по диапазону

При первоначальном импорте большой таблицы избегайте одновременной передачи всей таблицы. Более безопасный подход — поэтапный импорт по интервалам на основе монотонного поля.

import org.apache.spark.sql.functions._

val sourceTable = "dwd.iot_orders"
val lowerBound = 1L
val upperBound = 100000L

val batchDf = spark.table(sourceTable)
  .where(col("ingest_id").between(lowerBound, upperBound))
  .select(
    col("ingest_id"),
    col("event_id"),
    col("device_id"),
    col("tenant_id"),
    col("event_time").cast("timestamp"),
    col("payload")
  )
  .repartition(4, col("ingest_id"))

batchDf.write
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option("dbschema", sys.env.getOrElse("YMATRIX_SCHEMA", "public"))
  .option("dbtable", "iot_orders_batch_demo")
  .option("distributedby", "ingest_id")
  .option("server.port", "43001")
  .mode("append")
  .save()

Используйте этот подход, когда:

  • Выполняете первоначальную миграцию большой таблицы.
  • Осуществляете поэтапный (gray-scale) импорт.
  • Дозагружаете данные пакетами.
  • Вручную контролируете объём каждого пакета.

5.5 Пример 5: Запись через Spark SQL

Если ваш workflow ориентирован на SQL, а не на DataFrame API, можно использовать временный sink-view.

Сначала создайте целевую таблицу в YMatrix:

create table public.orders_sql_sink (
  order_id bigint,
  user_id text,
  amount decimal(18,2),
  created_at timestamp
)
distributed by (order_id);

Затем выполните следующее в Spark:

import spark.implicits._
import org.apache.spark.sql.functions._

Seq(
  (1L, "u001", BigDecimal("18.50"), "2026-03-23 10:00:00"),
  (2L, "u002", BigDecimal("20.00"), "2026-03-23 10:05:00")
).toDF("order_id", "user_id", "amount", "created_at")
  .withColumn("created_at", col("created_at").cast("timestamp"))
  .createOrReplaceTempView("spark_orders_src")

spark.sql("DROP VIEW IF EXISTS ymatrix_orders_sink")

spark.sql(
  s"""
     |CREATE TEMPORARY VIEW ymatrix_orders_sink
     |USING com.itsumma.gpconnector.GreenplumDataSource
     |OPTIONS (
     |  url '${sys.env("YMATRIX_URL")}',
     |  user '${sys.env("YMATRIX_USER")}',
     |  password '${sys.env("YMATRIX_PASSWORD")}',
     |  dbschema '${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}',
     |  dbtable 'orders_sql_sink',
     |  network.timeout '120s',
     |  server.timeout '120s',
     |  dbmessages 'WARN'
     |)
     |""".stripMargin
)

spark.sql(
  """
    |INSERT INTO TABLE ymatrix_orders_sink
    |SELECT /*+ REPARTITION(4, order_id) */
    |  order_id,
    |  user_id,
    |  amount,
    |  created_at
    |FROM spark_orders_src
    |""".stripMargin
)

Ключевые моменты этого подхода:

  • Лучше всего подходит для SQL-ориентированных заданий.
  • Целевая таблица должна существовать заранее.
  • Поля в операторе INSERT должны быть перечислены явно.

5.6 Пример 6: Проверка после записи

После записи рекомендуется выполнить хотя бы базовую проверку.

Проверка количества строк:

spark.read
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option(
    "dbtable",
    s"select count(*)::bigint as cnt from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
  )
  .load()
  .show(false)

Проверка диапазона первичного ключа или водяной метки:

spark.read
  .format("its-ymatrix")
  .option("url", sys.env("YMATRIX_URL"))
  .option("user", sys.env("YMATRIX_USER"))
  .option("password", sys.env("YMATRIX_PASSWORD"))
  .option(
    "dbtable",
    s"select min(order_id) as min_id, max(order_id) as max_id from ${sys.env.getOrElse("YMATRIX_SCHEMA", "public")}.orders_demo"
  )
  .load()
  .show(false)

Рекомендуется выполнять как минимум три типа проверок:

  1. Проверка количества строк.
  2. Проверка диапазона первичного ключа или водяной метки.
  3. Проверка выборочных данных.

6. Распространённые проблемы и меры предосторожности

6.1 Обязательное явное указание режима записи

Вы должны явно указать режим записи:

  • mode("append")
  • mode("overwrite")

Не пропускайте указание режима записи.

6.2 Автоматическое создание таблиц

Хотя коннектор может автоматически создавать таблицы в некоторых сценариях, если целевая таблица не существует, для производственных сред рекомендуется заранее создавать основные таблицы и управлять ими явно:

  • Типы данных столбцов
  • Стратегии распределения
  • Атрибуты хранения

6.3 Отладка сетевых проблем

Если возникают ошибки, связанные с подключениями gpfdist, в первую очередь проверьте:

  • Является ли SPARK_LOCAL_IP адресом, доступным для segments YMatrix.
  • Открыты ли порты на хостах, где работают Spark Driver или Executors.
  • Требуется ли фиксированный server.port для отладки.

6.4 Понимание количества партиций

repartition в основном влияет на параллелизм записи.

Общие рекомендации:

  • Используйте меньше партиций для отладки малых таблиц.
  • Включайте ключ распределения в repartition при импорте больших таблиц.
  • Убедитесь, что количество партиций не значительно превышает число primary segments YMatrix.

6.5 Критические типы для проверки

Перед формальным импортом рекомендуется тщательно проверить следующие типы полей:

  • decimal
  • timestamp
  • date
  • JSON, геометрия и другие расширенные типы
  • Очень длинные строковые поля

6.6 Рекомендуемая последовательность внедрения

Рекомендуем следовать в следующем порядке:

  1. Запустите минимальный пример с 3 строками через spark-shell.
  2. Выполните импорт одной таблицы с использованием реальной исходной таблицы.
  3. Проверьте отображение полей и совместимость типов.
  4. Выполните пакетный или массовый импорт больших таблиц.

Такая последовательность максимально упрощает выявление и устранение проблем, связанных с jar-файлами, параметрами, сетью, структурой и производительностью.