Настройка Автозагрузчика для производственных рабочих нагрузок

Databricks рекомендует использовать Auto Loader в Декларативных конвейерах Spark Lakeflow для постепенной загрузки данных. Декларативные конвейеры Spark Lakeflow расширяют функциональные возможности структурированной потоковой обработки данных Apache Spark и позволяют создавать только несколько строк декларативной Python или SQL для развертывания производственного конвейера данных с помощью:

Databricks также рекомендует следовать лучшим практикам потоковой обработки данных для запуска Auto Loader в рабочей среде. См. Рекомендации по эксплуатации для структурированной потоковой передачи.

Мониторинг автозагрузчика

В следующих разделах описывается мониторинг автозагрузчика в рабочей среде, включая метрики, журналы, оповещения и распространенные рабочие процессы устранения неполадок.

Запрашивание файлов, обнаруженных Автозагрузчиком

Автозагрузчик предоставляет API SQL для проверки состояния потока. С помощью функции cloud_files_state можно найти метаданные о файлах, обнаруженных потоком Автозагрузчика. Запрос cloud_files_state, предоставляющий расположение контрольной точки, связанное с потоком автозагрузчика.

Note

Функция cloud_files_state доступна в Databricks Runtime 11.3 LTS и выше.

SELECT * FROM cloud_files_state('path/to/checkpoint');

Прослушивание обновлений потоков

Для дальнейшего мониторинга потоков автозагрузчика Databricks рекомендует использовать интерфейс прослушивателя потокового запроса Apache Spark. См. Мониторинг запросов структурированной потоковой передачи на Azure Databricks.

Автозагрузчик передает метрики в Streaming Query Listener для каждого пакета. Вы можете просмотреть количество файлов в невыполненной работе и объем невыполненной работы на numFilesOutstandingnumBytesOutstandingвкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:

{
  "sources": [
    {
      "description": "CloudFilesSource[/path/to/source]",
      "metrics": {
        "numFilesOutstanding": "238",
        "numBytesOutstanding": "163939124006"
      }
    }
  ]
}

При использовании режима уведомлений файлов в Databricks Runtime 10.4 LTS и более поздних версиях метрики также включают приблизительное количество событий файлов в облачной очереди как approximateQueueSize для AWS и Azure.

Рекомендации по затратам

При запуске автозагрузчика основные источники затрат — вычислительные ресурсы и обнаружение файлов.

Чтобы сократить затраты на вычислительные ресурсы, Databricks рекомендует использовать задания Lakeflow для планирования Auto Loader в качестве пакетных заданий с использованием Trigger.AvailableNow вместо его непрерывного выполнения, если у вас нет требований к низкой задержке. См. раздел "Настройка интервалов триггера структурированной потоковой передачи". Эти пакетные задания можно активировать с помощью триггеров прибытия файла , чтобы снизить задержку между поступлением и обработкой файла.

Затраты на обнаружение файлов могут возникать в виде операций LIST с учетными записями хранения в режиме перечисления каталогов, а также в виде запросов API на службе подписки и службы очередей в режиме уведомлений о файлах. Если для рабочей нагрузки требуются непрерывные триггеры, Databricks рекомендует выбрать режим обнаружения файлов в зависимости от ваших требований к задержке:

  • Низкая задержка и простота. Используйте автозагрузчик с событиями файлов. Для событий файлов требуется только одна очередь на контейнер и при последующих запусках используется добавочное обнаружение. Дополнительные сведения см. в разделе " Автозагрузчик" с общими сведениями о событиях файлов.
  • Очень чувствительные к задержке приложения: используйте классический режим уведомлений о файлах. Классический режим считывает данные напрямую из облачной очереди без дополнительного этапа кэширования, который добавляют события файлов. В этом режиме можно пометить ресурсы, созданные автозагрузчиком для отслеживания затрат с помощью тегов ресурсов. Дополнительные сведения см. в разделе "Уведомление о файле".

Хранение исходных данных

Note

Доступно в Databricks Runtime 16.4 LTS и более поздних версиях.

Поскольку файлы накапливаются в исходном каталоге, затраты на хранение увеличиваются, а обнаружение файлов замедляется, особенно в режиме перечисления каталогов. Автозагрузчик предоставляет cloudFiles.cleanSource возможность автоматического управления хранением файлов путем архивации или удаления файлов после их обработки.

Архивация файлов в исходном каталоге для снижения затрат

Warning

  • Настройка cloudFiles.cleanSource удаляет или перемещает файлы в исходном каталоге.
  • Если вы используете foreachBatch для обработки данных, файлы становятся кандидатами на перемещение или удаление, как только операция foreachBatch завершается успешно, даже если она обрабатывает только подмножество файлов в пакете.

Databricks рекомендует использовать автозагрузчик с событиями файлов для снижения затрат на обнаружение. Это также снижает затраты на вычисления, так как обнаружение является добавочным.

Если вы не можете использовать события файлов и должны использовать список каталогов для обнаружения файлов, можно использовать cloudFiles.cleanSource параметр для автоматического архивации или удаления файлов после обработки автозагрузчика их для снижения затрат на обнаружение. Так как автозагрузчик очищает файлы из исходного каталога после обработки, во время обнаружения нужно указать меньше файлов.

При использовании cloudFiles.cleanSource с параметром MOVE учитывайте следующие требования:

  • Исходный каталог и целевой каталог назначения перемещения должны находиться в одном бакете или контейнере. Перемещения между корзинами и между контейнерами не поддерживаются и приводят к ошибке.
  • Назначение перемещения может быть маршрутом тома (например, /Volumes/my_catalog/my_schema/my_volume/archive/).
  • Если исходный и целевой каталог находятся в одном и том же внешнем расположении, они не должны иметь родственных каталогов, содержащих управляемое хранилище (например, управляемый том или архив). В таких случаях автозагрузчик не может получить необходимые разрешения для записи в целевой каталог.

Databricks рекомендует использовать этот параметр, если:

  • Исходный каталог накапливает большое количество файлов с течением времени.
  • Необходимо сохранить обработанные файлы для соответствия или аудита (установите cloudFiles.cleanSource на MOVE).
  • Вы хотите сократить затраты на хранение, удаляя файлы после загрузки (установите cloudFiles.cleanSource в DELETE). При использовании режима DELETE Databricks рекомендует включить управление версиями в хранилище, чтобы Auto Loader выполнял мягкое удаление, что позволяет восстановить данные в случае неправильной настройки. Кроме того, Databricks рекомендует настроить политики жизненного цикла облачных данных для очистки старых мягко удалённых версий после указанного льготного периода (например, 60 или 90 дней) в зависимости от ваших требований к восстановлению.

Полный справочник по параметрам и их значениям по умолчанию см. в cleanSourcecloudFiles.cleanSource.

Перемещение обработанных файлов в путь к холодному хранилищу

Следующий пример настраивает автозагрузчик для перемещения обработанных файлов в архивный каталог в пределах одного контейнера через 14 дней. Политику жизненного цикла облака можно применить к архивной схеме для перехода файлов на более дешевые уровни хранилища (например, AWS S3 Glacier, Azure Cool/Archive или GCS Coldline/Archive).

Python

# Step 1: Configure Auto Loader to move processed files to an archive path.
checkpoint = "/Volumes/my_catalog/my_schema/my_volume/checkpoints/ingest_stream"
archive_path = "s3://my-bucket/archive/landing/"

df = (spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.cleanSource", "MOVE")
  .option("cloudFiles.cleanSource.moveDestination", archive_path)
  .option("cloudFiles.cleanSource.retentionDuration", "14 days")
  .option("cloudFiles.schemaLocation", checkpoint)
  .load("s3://my-bucket/landing/")
)

# Step 2: Write to a Delta table.
(df.writeStream
  .option("checkpointLocation", checkpoint)
  .trigger(availableNow=True)
  .toTable("my_catalog.my_schema.raw_events")
)

# Step 3 (outside Databricks): Set up a cloud lifecycle policy on the
# archive path to transition files to cold storage after a grace period.
# For example, in AWS you can configure an S3 Lifecycle rule to move
# objects under s3://my-bucket/archive/landing/ to S3 Glacier after
# 30 days.

SQL

-- Step 1: Configure Auto Loader to move processed files to an archive path
-- using a Lakeflow Declarative Pipeline.
CREATE OR REFRESH STREAMING TABLE raw_events
AS SELECT * FROM STREAM read_files(
  's3://my-bucket/landing/',
  format => 'json',
  cleanSource => 'MOVE',
  cleanSourceMoveDestination => 's3://my-bucket/archive/landing/',
  cleanSourceRetentionDuration => '14 days'
);

-- Step 2 (outside Databricks): Set up a cloud lifecycle policy on the
-- archive path to transition files to cold storage.
-- For example, in AWS configure an S3 Lifecycle rule to move objects
-- under s3://my-bucket/archive/landing/ to S3 Glacier after 30 days.

Использование Trigger.AvailableNow и ограничения частоты

Note

Доступно в Databricks Runtime 10.4 LTS и более поздних версиях.

Автозагрузчик можно запланировать запуск в заданиях Lakeflow в качестве пакетного задания с помощью Trigger.AvailableNow. Триггер AvailableNow указывает автозагрузчику обрабатывать все файлы, поступающие до начала запроса. Новые файлы, поступающие после запуска потока, игнорируются до следующего триггера.

При этом Trigger.AvailableNow обнаружение файлов выполняется асинхронно с обработкой данных, и обработка данных может происходить в нескольких микропакетах с ограничением скорости обработки. По умолчанию автозагрузчик обрабатывает не более 1000 файлов в каждом микропакете. Можно настроить cloudFiles.maxFilesPerTrigger и cloudFiles.maxBytesPerTrigger, чтобы указать, сколько файлов или байтов должно обрабатываться в микропакете. Ограничение по файлам является жестким ограничением, а ограничение по байтам — мягким, что означает, что можно обработать больше байтов, чем указано maxBytesPerTrigger. Если оба параметра предоставляются вместе, автозагрузчик обрабатывает столько файлов, которые необходимы для выполнения одного из ограничений.

Расположение контрольной точки

Местоположение контрольной точки используется для сохранения информации о состоянии и прогрессе потока. Databricks рекомендует задать расположение контрольной точки в расположении без политики жизненного цикла облачных объектов. Если файлы, находящиеся в расположении контрольной точки, очищаются в соответствии с политикой, состояние потока может быть повреждено. В этом случае необходимо перезапустить поток с нуля.

Отслеживание событий файла

Автозагрузчик отслеживает обнаруженные файлы в контрольной точке с помощью RocksDB, чтобы гарантировать однократное потребление данных. Для потоков приема большого объема или с длительным временем существования Databricks рекомендует выполнить обновление до Databricks Runtime 15.4 LTS или более поздней. В этих версиях автозагрузчик не ожидает загрузки всего состояния RocksDB до запуска потока, что может ускорить время запуска потока. Если вы хотите предотвратить рост состояний файлов без ограничений, можно также рассмотреть возможность cloudFiles.maxFileAge истечения срока действия событий файла, которые старше определенного возраста. Минимальное значение, которое можно задать для cloudFiles.maxFileAge"14 days". Удаление в RocksDB отображается в виде записей tombstone. Таким образом, вы можете временно увеличить использование хранилища по мере истечения срока действия событий, прежде чем он начнет выравниваться.

Warning

cloudFiles.maxFileAge предоставляется в качестве механизма управления затратами для больших наборов данных. Настройка cloudFiles.maxFileAge слишком агрессивно может вызвать проблемы с качеством данных, такие как дублирование загрузки или отсутствие файлов. Таким образом, Databricks рекомендует установить консервативное значение для cloudFiles.maxFileAge, например, 90 дней, что аналогично рекомендациям сопоставимых решений для приема данных.

Попытка настроить параметр cloudFiles.maxFileAge может привести к тому, что необработанные файлы будут игнорироваться Автозагрузчиком, или истечет срок действия уже обработанных файлов, а затем они будут повторно обработаны, что приведет к дублированию данных. Ниже приведены некоторые моменты, которые следует учитывать при выборе cloudFiles.maxFileAge.

  • Если поток перезапускается с длительным интервалом, извлеченные из очереди события уведомлений о файлах, которые старше cloudFiles.maxFileAge, будут игнорироваться. Аналогичным образом, если вы используете список каталогов, файлы, которые могли появиться во время простоя, которые старше, чем cloudFiles.maxFileAge игнорируются.
  • Если вы используете режим перечисления каталогов и используете cloudFiles.maxFileAge, например, установите значение "1 month", вы остановите поток и перезапустите поток с cloudFiles.maxFileAge заданным значением "2 months", файлы старше 1 месяца, но более последних 2 месяцев повторно обрабатываются.

Если этот параметр установлен при первом запуске потока, вы не будете принимать данные старше, чем cloudFiles.maxFileAge, поэтому если вы хотите принимать старые данные, этот параметр не следует задавать при первом запуске потока. Однако этот параметр следует задать при последующих запусках.

инициирование регулярных дозаполнений с помощью cloudFiles.backfillInterval

В редких случаях файлы могут быть пропущены или доставлены с опозданием при зависимости исключительно от систем уведомлений, например, когда достигаются ограничения на хранение сообщений уведомлений. Если у вас есть строгие требования к полноте данных и SLA, рассмотрите настройку cloudFiles.backfillInterval для запуска асинхронных наполнений с заданным интервалом. Например, установите его на один день для ежедневного заполнения или одну неделю для еженедельного заполнения. Запуск регулярных обратных заполнений не приведет к появлению дубликатов.

При использовании событий файлов запустите поток по крайней мере один раз в 7 дней

При использовании событий файлов запустите потоки автозагрузчика по крайней мере раз в 7 дней, чтобы избежать полного списка каталогов. При запуске потоков автозагрузчика с такой частотой будет обеспечено, что обнаружение файлов осуществляется добавочно.

Для получения рекомендаций по передовым практикам управления файлами, см. Рекомендации по использованию автозагрузчика с событиями файлов.