Использование foreachBatch для записи в произвольные приемники данных

На этой странице показано, как использовать foreachBatch структурированную потоковую передачу для записи выходных данных потокового запроса в источники данных, которые не имеют существующего приемника потоковой передачи.

Шаблон streamingDF.writeStream.foreachBatch(...) кода позволяет применять пакетные функции к выходным данным каждого микропакета потокового запроса. Функции, используемые с foreachBatch, принимают два параметра:

  • Кадр данных с выходными данными микропакета.
  • Уникальный идентификатор микропакета.

Необходимо использовать foreachBatch для операций слияния Delta Lake в структурированной потоковой передаче. См. Upsert из потоковых запросов с использованием foreachBatch.

Применение дополнительных операций с DataFrame

Многие операции с кадрами данных и наборами данных не поддерживаются при потоковой передаче кадров данных, так как в этих случаях Spark не поддерживает создание добавочных планов. С помощью foreachBatch() можно применить некоторые из этих операций к каждому выходному микропакету. Например, можно использовать foreachBatch() и SQL-операцию MERGE INTO, чтобы записывать выходные данные агрегатов потоковой обработки в таблицу Delta в режиме обновления. Дополнительные сведения см. в MERGE INTO.

Внимание

  • foreachBatch() предоставляет гарантии записи как минимум один раз. Однако вы можете использовать batchId, предоставленные функции в качестве способа дедупликации выходных данных и получения точной гарантии. В любом случае вам нужно будет самостоятельно принять решение о комплексной семантике.
  • foreachBatch() не работает с режимом непрерывной обработки, так как существенно зависит от выполнения микропакетов запроса потоковой передачи. Если данные записываются в непрерывном режиме, используйте foreach().
  • При использовании foreachBatch с оператором с сохранением состояния важно полностью обработать каждый пакет перед завершением обработки. См. полное потребление каждого пакетного DataFrame

Обработка пустых кадров данных

foreachBatch() может получить пустой кадр данных, и код должен обрабатывать этот сценарий. В противном случае запрос может завершиться ошибкой.

Например, если Delta Lake является источником потоковой передачи, в этих случаях в foreachBatch() может быть передан пустой DataFrame.

  • OPTIMIZE без файлов для обработки: когда OPTIMIZE операция выполняется в исходной таблице Delta Lake, но нет файлов для обработки, структурированная потоковая передача записывает запись журнала смещения, чтобы увеличить версию таблицы. Это создает пустой микропакет на приемнике данных, даже если файлы не читаются.
  • Очистка файлов на уровне физического плана: если при использовании отправки предиката или очистке файлов удаляются все записи на уровне физического плана, результатом является пустая фиксация изменений в приемник.

Пользовательский код должен обрабатывать пустые DataFrame, чтобы обеспечить их корректную работу. См. приведенные ниже примеры.

Python

def process_batch(output_df, batch_id):
  # Process valid DataFrames only
  if not output_df.isEmpty():
    # business logic
    pass

streamingDF.writeStream.foreachBatch(process_batch).start()

Scala

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid DataFrames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Изменения foreachBatch поведения в Databricks Runtime 14.0

В Databricks Runtime 14.0 и выше для вычислений, настроенных в стандартном режиме доступа, применяются следующие изменения поведения:

  • print() команды записывают выходные данные в журналы драйверов.
  • Невозможно получить доступ к подмодулу dbutils.widgets внутри функции.
  • Все файлы, модули или объекты, на которые ссылается функция, должны быть сериализуемыми и доступными в Spark.

Повторное использование существующих источников данных пакетной обработки

С помощью foreachBatch()можно использовать существующие средства записи пакетных данных для приемников данных, которые, возможно, не поддерживают структурированную потоковую передачу. Вот несколько таких случаев.

Многие другие источники пакетных данных можно использовать из foreachBatch(). См. статью "Подключение к источникам данных" и внешним службам.

Запись в несколько расположений

Если необходимо записать выходные данные потокового запроса в несколько расположений, Databricks рекомендует использовать несколько записей структурированной потоковой передачи для оптимальной параллелизации и пропускной способности.

Использование foreachBatch для записи в несколько приемников сериализует выполнение потоковой записи, что может увеличить задержку для каждого микропакета.

Если вы используете foreachBatch для записи в несколько таблиц Delta, см. статью Use foreachBatch for idempotent table writes.

Полностью использовать каждый пакетный кадр данных

При использовании операторов с отслеживанием состояния (например, с помощью dropDuplicatesWithinWatermark), каждая пакетная итерация должна обработать весь DataFrame или перезапустить запрос. Если вы не используете весь DataFrame, потоковый запрос завершится неудачей на следующей партии данных.

Это может произойти в нескольких случаях. В следующих примерах показано, как исправить запросы, которые неправильно используют кадр данных.

Намеренное использование подмножества пакета

Если вас интересует только подмножество пакета, вы можете использовать код, например, следующий.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

В этом случае batch_df.show(2) обрабатывает только первые два элемента в пакете, что ожидается, но если в пакете больше элементов, они должны быть обработаны. Следующий код использует полный кадр данных.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

do_nothing Здесь функция тихо игнорирует остальную часть DataFrame.

Обработка ошибки в пакете

Для обработки ошибок в foreachBatch Databricks рекомендуется разрешить потоковому запросу быстро завершиться и вместо этого полагаться на оркестрационный уровень, например, Задания Lakeflow или Apache Airflow, для управления логикой повторных запусков. Это гораздо безопаснее, чем создание сложных циклов повторных попыток в коде, где может произойти потеря данных.

Ниже приведены рекомендации, основанные на вашей цели записи:

Цель Примеры Руководство
Операции с DataFrame Таблицы Delta Lake Необходимо использовать опции записи txnAppId и txnVersion, привязывая txnVersion к batchId, чтобы гарантировать идемпотентность и защиту правильности данных при повторных попытках. Не ловите и повторно обрабатывайте исключения локально. Вместо этого Databricks рекомендует разрешить ошибки распространяться, чтобы метрики Spark оставались точными, данные не дублировались, а оркестратор мог корректно повторно обработать весь пакет.
Пользовательский код и внешние назначения .collect(), базы данных OLTP, очереди сообщений, API Реализуйте собственную идемпотентность. Необходимо предположить, что любая операция может быть повторена в разных пакетах. batchId Если значение остается неизменным, результат операции должен оставаться неизменным. Вы можете повторить исключительно временные ошибки, такие как краткие тайм-ауты подключения, но следует соблюдать осторожность, чтобы избежать частичных или повторяющихся операций записи, если попытка в конечном итоге окажется неудачной. Самый безопасный подход заключается в том, чтобы разрешить распространение ошибок и разрешить оркестратору повторить весь пакет.

Ниже приведены некоторые примеры типов исключений и рекомендаций по их обработке в foreachBatch:

Тип исключения Примеры Рекомендуемое действие
Временные ошибки приемника SQLTransientConnectionException, HTTP 429, тайм-ауты Перехват: повтор или отправка в очередь недоставленных писем
Нарушения повторяющихся или ключевых ограничений, когда приемник является идемпотентным SQLIntegrityConstraintViolationException Catch: логировать и подавить
Пользовательские переповторяемые ошибки Исключения сокета, ошибки базы данных, подлежащие повторной попытке Catch: увеличение значений метрик и разрешение управляемого продолжения
Ошибки логики или схемы NullPointerException, AttributeError, несоответствие схемы Распространение: допустите, чтобы Spark завершился ошибкой выполнения запроса
Ошибки конечного узла, не допускающие повторной попытки, или необнаруженные логические ошибки ValueError, PermissionError Распространение: допустите, чтобы Spark завершился ошибкой выполнения запроса
Критические сбои OutOfMemoryError, поврежденное состояние, нарушения целостности данных Распространение: допустите, чтобы Spark завершился ошибкой выполнения запроса

Примеры кода: обработка исключений

В следующих примерах преднамеренно вызывается ошибка в foreach, чтобы показать различные подходы к обработке ошибки.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Приведенный выше код обрабатывает и тихо подавляет ошибку и может не обрабатывать оставшуюся часть пакета. Существует два варианта обработки этой ситуации.

Во-первых, вы можете снова выбросить эту ошибку, передав её в ваш уровень оркестрации для повторной обработки пакета. Это может устранить ошибку, если это временная проблема, или доложить о ней вашей операционной команде, чтобы она попробовала вручную исправить. Для этого измените partial_func код следующим образом:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Во-вторых, если вы хотите перехватывать исключение и игнорировать остальную часть пакета, можно изменить код, чтобы использовать do_nothing функцию для автоматического пропуска остальной части пакета.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()