Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Триггеры прибытия файла можно использовать для активации выполнения задания при поступлении новых файлов во внешнем хранилище, например Amazon S3, хранилище Azure или Google Cloud Storage. Эта функция полезна, если эффективность запланированного задания скомпрометирована нерегулярными новыми поступлениями данных.
Как работают триггеры прибытия файлов?
Триггеры прибытия файлов делают все возможное, чтобы проверить наличие новых файлов каждую минуту, хотя это может повлиять на производительность базового облачного хранилища. Триггеры прибытия файлов не влекут за собой дополнительных затрат, кроме затрат поставщика облачных услуг, связанных с перечислением файлов в расположении хранилища.
Триггер прибытия файла можно настроить для отслеживания корневого каталога внешнего местоположения или тома в Unity Catalog или подпути внешнего местоположения или тома. Например, для тома /Volumes/mycatalog/myschema/myvolume/каталога Unity ниже приведены допустимые пути для триггера прибытия файла:
/Volumes/mycatalog/myschema/myvolume/
/Volumes/mycatalog/myschema/myvolume/mydirectory/
Триггер прибытия файла рекурсивно проверяет наличие новых файлов во всех подкаталогах настроенного расположения. Например, вы создаёте триггер прибытия файла для расположения /Volumes/mycatalog/myschema/myvolume/mydirectory/, и там имеются следующие подкаталоги:
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirA
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirB
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirC/subdirD
Триггер проверяет наличие новых файлов в mydirectory, subdirA, subdirB, subdirCи subdirC/subdirD.
Триггеры прибытия файла с событиями файлов
Для оптимальной производительности внешнее расположение должно быть активировано для событий файлов. Если события, связанные с файлами, включены для внешнего расположения, Azure Databricks использует внутреннюю службу для отслеживания метаданных приема путем обработки уведомлений об изменениях от поставщиков облачных служб. Эта служба сохраняет метаданные для последних файлов, созданных или обновленных в течение периода хранения, определяемого службой, что повышает эффективность обработки файлов.
В течение нескольких минут после включения событий файлов во внешнем расположении существующие триггеры прибытия файлов, отслеживающие пути, охватываемые этим внешним расположением, начинают использовать активированные события файлов, а новые триггеры начинают работать в течение секунд.
Дополнительные сведения о преимуществах производительности и емкости событий файлов в внешних расположениях см. в разделе "Ограничения".
Перед тем как начать
Для использования триггеров прибытия файла необходимо следующее:
Рабочая область должна иметь включённый в каталоге Unity.
Необходимо использовать расположение хранилища, которое является томом или внешним местоположением, настроенным в каталоге Unity Catalog. Ознакомьтесь с разделом "Что такое тома каталога Unity" и "Общие сведения о внешних расположениях".
Databricks рекомендует включить внешнее расположение для событий управляемого файла. Тома в этих внешних расположениях по умолчанию получают поддержку событий файла. Чтобы включить файловые события, необходимо быть владельцем внешнего расположения или иметь
MANAGEпривилегию на внешнее расположение. Сведения о преимуществах событий файлов см. в разделе "Триггеры прибытия файла" с событиями файлов.Необходимо иметь
READразрешение на расположение хранилища и разрешения CAN MANAGE для задания. Дополнительные сведения о разрешениях задания см. в разделе ACL заданий.
Добавление триггера прибытия файла
Чтобы добавить триггер прибытия файла в задание:
- На боковой панели рабочей области Azure Databricks щелкните Задания и Конвейеры.
- При необходимости выберите фильтры "Задания " и " Принадлежащие мне ".
- Щелкните по ссылке с названием вашей работы.
- На панели сведений о задании справа нажмите кнопку "Добавить триггер".
- В Тип триггеравыберите прибытие файла.
- На размещении хранилища введите URL-адрес корня или подпути внешнего расположения каталога Unity или корня или подпути тома каталога Unity, чтобы выполнять мониторинг.
- (Необязательно) Настройка дополнительных параметров:
- Минимальное время между триггерами в секундах: минимальное время ожидания запуска после завершения предыдущего выполнения. Файлы, поступающие в этот период, активируют запуск только после истечения срока ожидания. Используйте этот параметр для управления частотой создания запуска.
- Подождите после последнего изменения в секундах: время ожидания запуска после прибытия файла. Другое прибытие файла в течение этого периода сбрасывает таймер. Этот параметр можно использовать при поступлении файлов в пакеты, а весь пакет должен обрабатываться после поступления всех файлов.
- Чтобы проверить конфигурацию, нажмите кнопку "Проверить подключение".
- Нажмите кнопку Сохранить.
Обнаружение и обработка файлов при поступлении
Для обработки файлов, которые активировали триггеры прибытия файла, можно использовать автозагрузчик. Автозагрузчик постепенно и эффективно обрабатывает новые файлы с гарантиями единственного выполнения. Например, используйте приведенный ниже фрагмент кода для загрузки файлов в таблицу Delta.
Чтобы использовать это решение, создайте задание с триггером прибытия файла и добавьте записную книжку, содержащую приведенный ниже код. Замените каждый [REPLACE] заполнитель соответствующим значением.
# Configuration
file_location = "[REPLACE]" # The same URL configured for the file arrival trigger.
checkpoint_location = "[REPLACE]" # a separate URL (outside `file_location`) used to store the Auto Loader checkpoint, which enables exactly-once processing.
sink_table = "[REPLACE]" # Delta table to write to
# Use Auto Loader to discover new files.
# Do not modify code below this line
streamingQuery = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", checkpoint_location) \
.option("cloudFiles.useManagedFileEvents","true") \
.load(file_location) \
.writeStream \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow = True) \
.toTable(sink_table)
Если вам нужно обработать новые файлы с помощью пользовательской логики и найти URL-адрес для новых файлов, можно использовать foreachBatch вместо этого, как показано в фрагменте кода ниже. Обратите внимание, что foreachBatch предоставляет гарантии обработки только по крайней мере один раз. Дополнительные сведения об использовании foreachBatchсм. в разделе "Использование foreachBatch для записи в произвольные приемники данных"
# Configuration
file_location = "[REPLACE]" # The same URL configured for the file arrival trigger.
checkpoint_location = "[REPLACE]" # a separate URL (outside `file_location`) used to store the Auto Loader checkpoint, which enables exactly-once processing.
def process_batch(batch_df, batch_id):
file_url = batch_df.select("path").collect()[0].path
# [REPLACE] Your custom function for processing newly arrived files
# Use Auto Loader to discover new files.
# Do not modify code below this line
streamingQuery = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("cloudFiles.useManagedFileEvents","true") \
.load(file_location) \
.drop("content") \
.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow = True) \
.start()
Оповещения о сбоях триггеров на прибытие файлов
Чтобы получать оповещения, если триггеру "При получении файла" не удается выполнить оценку, настройте уведомления по электронной почте или системные уведомления о сбое задания. См. , добавьте уведомления для задания.
Ограничения
- Запуски инициируются только новыми файлами. Перезапись существующего файла с тем же именем не запускает выполнение.
- Если расположение хранилища включено для событий файлов, добавление содержимого в существующий пустой файл активирует выполнение, так как это рассматривается как новое прибытие файла.
- События файлов прослушивают
FlushWithCloseсобытие для обработки файла. Некоторые операции Azure API могут не генерировать это событие, что может отложить обнаружение файлов. Сведения об этом сценарии см. в разделе "Классические события уведомлений о файлах".
Путь, используемый для триггера прибытия файла, не должен содержать внешние таблицы или управляемые расположения каталогов и схем.
Путь, используемый для триггера прибытия файла, не может содержать подстановочные знаки, например
*или?.Если расположение хранилища настроено как внешнее расположение в каталоге Unity, а внешнее расположение включено для событий файлов:
Нет ограничений на количество файлов в расположении хранилища.
Триггеры могут приводить к ошибке тайм-аута, если есть слишком много лишних обновлений файлов.
Если триггер прибытия файла установлен в подпате внешнего расположения или тома каталога Unity, изменения за пределами этого подпата, например в корневом каталоге внешнего расположения, могут увеличить метаданные, необходимые для обработки триггера. В средах с высоким уровнем изменений триггер может превысить ограничение времени обработки и ввести состояние ошибки.
Чтобы предотвратить это, создайте том каталога Unity, который сопоставляется специально с подкаталогом, который необходимо отслеживать и задать триггер прибытия файла в корневом каталоге этого тома. Этот подход изолирует целевой путь, делая его основным корневым каталогом триггера, что сокращает несвязанные изменения на уровне корня и предотвращает переход триггера в состояние ошибки.
Если существующий файл изменен и его метаданные выходят за пределы скользящего периода хранения, это изменение будет рассматриваться как новое поступление файла, запуская выполнение задания. Это можно предотвратить путем приема только неизменяемых файлов или использовать триггеры прибытия файлов с автозагрузчиком для отслеживания хода приема.
Если место хранения не включено для обработки файловых событий:
- Можно настроить не более 50 заданий с триггером прибытия файла в таких расположениях в рабочей области Azure Databricks.
- Расположение хранилища может содержать до 10 000 файлов. Если настроенное расположение хранилища является подпутем к внешнему расположению или томом в каталоге Unity, ограничение в 10 000 файлов применяется к данному подпутю, а не к корневому каталогу хранилища. Например, корневой каталог расположения хранилища может содержать более 10 000 файлов в его подкаталогах, но настроенный подкаталог не должен превышать 10 000 файлов.
См. также ограничения событий файлов.
Триггеры прибытия файлов в несуществующие пути в внешних хранилищах S3 и GCS
Если настроенный каталог не существует или удаляется из Amazon S3 или Google Cloud Storage, триггеры прибытия файлов продолжают оцениваться без ошибок. Это происходит, поскольку как S3, так и GCS не различаются между несуществующими, удаленными и пустыми каталогами.
В результате триггер прибытия файла, отслеживающий несуществующий или удаленный путь к каталогу, не завершается ошибкой или не создает уведомление об ошибке. Триггер продолжает оценивать, не находит файлы и не запускает никаких запусков заданий, пока файлы не будут добавлены в этот путь снова. Это ожидаемое поведение, а не условие ошибки.