Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице описывается, как настроить потоки автозагрузчика для использования режима уведомлений файлов для добавочного обнаружения и приема облачных данных.
В режиме уведомлений о файлах автозагрузчик автоматически настраивает службу уведомлений и службу очередей, которые подписываются на события файлов из входного каталога. Вы можете использовать уведомления файлов для масштабирования автозагрузчика для приема миллионов файлов в час. По сравнению с режимом перечисления каталогов режим уведомлений файлов является более производительным и масштабируемым.
Вы можете переключаться между уведомлениями о файлах и списком каталогов в любое время и по-прежнему поддерживать гарантии однократной обработки данных.
Примечание.
Режим уведомлений файлов не поддерживается для учетных записей Azure уровня 'Premium' в разделе хранения, поскольку учетные записи уровня 'Premium' не поддерживают очереди хранения.
Предупреждение
Изменение исходного пути для автозагрузчика не поддерживается для режима уведомлений файлов. Если используется режим уведомлений о файлах и путь изменен, возможно, не удается обработать файлы, которые уже присутствуют в новом каталоге во время изменения каталога.
Режим уведомления о файлах с включенными событиями файлов и без них во внешних расположениях
Существует два способа настройки автозагрузчика для использования режима уведомлений файлов:
Устаревший режим уведомлений файлов: вы управляете очередями уведомлений о файлах для каждого потока автозагрузчика отдельно. Автозагрузчик автоматически настраивает службу уведомлений и службу очередей, подписывающуюся на события файлов из входного каталога.
Это устаревший подход.
(Рекомендуется) События файлов (общедоступная предварительная версия): используется единственная очередь уведомлений о файлах, управляемая Azure Databricks, для всех потоков, обрабатывающих файлы из любого внешнего расположения, определенного в каталоге Unity.
Этот подход требует включения событий файлов для внешних расположений. Он имеет следующие преимущества по сравнению с устаревшим подходом:
- Azure Databricks может настроить подписки и события файлов в вашей учетной записи облачного хранилища без необходимости предоставления дополнительных учетных данных для Auto Loader, используя учетные данные службы или другие параметры облачной аутентификации. См . (рекомендуется) Включить события файлов для внешнего расположения.
- В вашей учетной записи облачного хранилища потребуется меньше политик управляемых удостоверений Azure.
- Так как вам больше не нужно создавать очередь для каждого потока автозагрузчика, проще избежать попадания ограничений на уведомления поставщика облачных служб, перечисленных в облачных ресурсах, используемых в устаревшем режиме уведомлений файлов автозагрузчика.
- Azure Databricks автоматически управляет настройкой требований к ресурсам, поэтому вам не нужно настраивать такие параметры, как
cloudFiles.fetchParallelism
. - Функциональность очистки означает, что вам не нужно беспокоиться о жизненном цикле уведомлений, созданных в облаке, например при удалении или полном обновлении потока.
Databricks рекомендует, чтобы при использовании автозагрузчика в режиме перечисления каталогов сегодня вы переходите в режим уведомлений о файлах с событиями файлов, чтобы увидеть значительные улучшения производительности.
Используйте режим уведомлений о событиях файлов
В этом разделе описывается создание и обновление потоков автозагрузчика для использования событий файлов.
Внимание
Поддержка автозагрузчика для событий файлов доступна в общедоступной предварительной версии. Чтобы зарегистрироваться в предварительной версии, обратитесь к группе учетных записей Azure Databricks.
Перед тем как начать
Для настройки событий файлов требуется:
- Рабочая область Azure Databricks, активированная для Unity Catalog.
- Разрешение на создание учетных данных доступа для хранилища и объектов внешнего расположения в каталоге Unity.
Для потоков автозагрузчика с событиями файлов требуется:
- Выполните вычисления на Databricks Runtime 14.3 LTS или более поздней.
Перед созданием или обновлением потока автозагрузчика, использующего события файлов:
- Проверьте существование потоков автозагрузки, основанных на уведомлениях, которые используют данные из внешнего источника. Если это сделать, отключите их и удалите связанные ресурсы уведомлений.
Инструкции по настройке конфигурации
Следующие инструкции применяются к созданию новых потоков автозагрузчика или переносу существующих потоков для использования обновленного режима уведомлений о файлах с событиями файлов:
Создайте учетные данные хранилища и внешнее расположение в Unity Catalog, предоставляющее доступ к исходному местоположению в облачном хранилище для потоков Auto Loader. См. статью "Создание внешнего расположения для подключения облачного хранилища к Azure Databricks".
Включите события файлов для внешнего источника. См . (рекомендуется) Включить события файлов для внешнего расположения.
При создании нового потока автозагрузчика или изменении существующего для работы с внешней локацией:
- Если у вас есть потоки автозагрузчика на основе уведомлений, которые используют данные из внешнего источника, выключите их и удалите соответствующие ресурсы уведомлений.
- Убедитесь, что
pathRewrites
не установлено (это не является распространённым параметром). - Просмотрите список параметров , которые автозагрузчик игнорирует при управлении уведомлениями о файлах с помощью событий файлов. Избегайте их в новых потоках автозагрузчика и удалите их из существующих потоков, которые вы переносите в этот режим.
- Установите параметр
cloudFiles.useManagedFileEvents
наtrue
в коде автозагрузчика.
Рассмотрим пример.
autoLoaderStream = (spark.readStream
.format("cloudFiles")
...
.options("cloudFiles.useManagedFileEvents", True)
...)
Если вы используете декларативные конвейеры Lakeflow и уже имеете конвейер с потоковой таблицей, обновите его, чтобы включить useManagedFileEvents
этот параметр:
CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
FROM STREAM read_files('abfss://path/to/external/location/or/volume',
format => '<format>',
useManagedFileEvents => 'True'
...
);
Неподдерживаемые параметры автозагрузчика
Следующие параметры автозагрузчика не поддерживаются при использовании потоков событий файлов:
Настройки | Изменение |
---|---|
useIncremental |
Вам больше не нужно выбирать между эффективностью уведомлений о файлах и простотой перечисления каталогов. Автозагрузчик с событиями файлов работает в одном режиме. |
useNotifications |
В каждом внешнем расположении существует только одна подписка на события очереди и хранилища. |
cloudFiles.fetchParallelism |
Автозагрузчик с событиями, связанными с файлами, не обеспечивает оптимизацию параллелизма вручную. |
cloudFiles.backfillInterval |
Azure Databricks автоматически обрабатывает дозаполнение для внешних расположений, которые активированы для обработки событий файлов. |
cloudFiles.pathRewrites |
Этот параметр применяется только при подключении внешних расположений данных к DBFS, который не рекомендуется. |
resourceTags |
Необходимо задать теги ресурсов с помощью облачной консоли. |
Ограничения автозагрузчика с событиями файлов
Служба событий файлов оптимизирует обнаружение файлов путем кэширования последних созданных файлов. Если автозагрузчик выполняется редко, этот кэш может истекать, а автозагрузчик возвращается в список каталогов, чтобы обнаружить файлы и обновить кэш. Чтобы избежать этого сценария, вызов автозагрузчика по крайней мере раз в семь дней.
Общий список ограничений для событий файлов см. в разделе "Ограничения событий файлов".
Управление очередями уведомлений файлов для каждого потока автозагрузчика отдельно (устаревшая версия)
Внимание
Вам нужны повышенные разрешения для автоматической настройки облачной инфраструктуры для режима уведомлений о файлах. Обратитесь к администратору облака или администратору рабочей области. Видеть:
Облачные ресурсы, используемые в устаревшем режиме уведомления файлов автозагрузчика
Автозагрузчик может автоматически настраивать уведомления о файлах при настройке параметра cloudFiles.useNotifications
true
и предоставления необходимых разрешений для создания облачных ресурсов. Кроме того, может потребоваться предоставить дополнительные параметры для того, чтобы автозагрузчик получил разрешение на создание этих ресурсов.
В следующей таблице перечислены ресурсы, созданные автозагрузчиком для каждого поставщика облачных служб.
Облачное хранилище | Служба подписки | Использование хранилища очередей из Python | Префикс* | Предел** |
---|---|---|---|---|
Amazon S3 | AWS SNS | AWS SQS | databricks-auto-ingest | 100 на каждый контейнер S3 |
ADLS | Сетку событий Azure | Хранилище очередей Azure | датабрикс | 500 на каждую учетную запись хранения |
Глобальная система связи (if "GCS" stands for "Global Communication System") | Публикация и подписка Google | Публикация и подписка Google | databricks-auto-ingest | 100 на каждый контейнер GCS |
Хранилище BLOB-объектов Azure | Сетку событий Azure | Хранилище очередей Azure | датабрикс | 500 на каждую учетную запись хранения |
* Автозагрузчик именует ресурсы с этим префиксом.
**Количество параллельных конвейеров уведомлений о файлах, которые можно запустить.
Если необходимо запустить больше потоков автозагрузки, основанных на уведомлениях о файлах, чем разрешено этими ограничениями, можно использовать события файлов или службу, например AWS Lambda, Функции Azure или Google Cloud Functions, чтобы распространять уведомления из одной очереди, которая прослушивает весь контейнер или хранилище, в очереди, ориентированные на конкретные каталоги.
События уведомления о устаревших файлах
Amazon S3 предоставляет событие ObjectCreated
при загрузке файла в бакет S3, независимо от того, была ли использована однократная или многократная загрузка.
Azure Data Lake Storage предоставляет различные уведомления о событиях для файлов, которые отображаются в контейнере хранилища.
- Автозагрузчик прослушивает на наличие события
FlushWithClose
для обработки файла. - Потоки автозагрузчика поддерживают
RenameFile
действие обнаружения файлов.RenameFile
действия требуют запроса API к системе хранения, чтобы получить размер переименованного файла. - Потоки Автозагрузчика, созданные с помощью Databricks Runtime 9.0 и более поздних версий, поддерживают действие
RenameDirectory
для обнаружения файлов.RenameDirectory
действия требуют запросов API к системе хранения для перечисления содержимого переименованного каталога.
Google Cloud Storage предоставляет событие OBJECT_FINALIZE
при передаче файла, включая перезапись и копирование файлов. Неудачные загрузки не вызывают это событие.
Примечание.
Поставщики облачных служб не гарантируют доставку 100% всех событий файлов в очень редких случаях и не предоставляют строгие SLA по задержке в обработке событий файла. В Databricks рекомендуется активировать регулярные обратные заполнения с помощью Автозагрузчика, используя параметр cloudFiles.backfillInterval
, чтобы гарантировать, что все файлы будут обнаружены в рамках данного соглашения об уровне обслуживания, если полнота данных является обязательным условием. Активация регулярных выполнения задним числом не приведет к появлению дубликатов.
Необходимые разрешения для настройки уведомлений о файлах для Azure Data Lake Storage и хранилища BLOB-объектов Azure
Необходимо иметь разрешения на чтение для входного каталога. См. раздел Хранилище BLOB-объектов Azure.
Чтобы использовать режим уведомлений файлов, необходимо предоставить учетные данные проверки подлинности для настройки и доступа к службам уведомлений о событиях.
Вы можете пройти проверку подлинности с помощью одного из следующих методов:
- В Databricks Runtime 16.1 и более поздних версиях: учетные данные службы Databricks (рекомендуется): создайте учетные данные службы с помощью управляемого удостоверения и соединителя доступа Databricks.
- Учетная запись службы: создайте приложение и учетную запись службы идентификатора Microsoft Entra (прежнее название — Azure Active Directory) с идентификатором клиента и секретным ключом.
После получения учетных данных аутентификации назначьте необходимые разрешения коннектору доступа Databricks (для учетных данных службы) или приложению Microsoft Entra ID (для учетной записи службы).
Использование встроенных ролей Azure
Назначьте соединителю доступа следующие роли в учетной записи хранения, где находится входной путь:
- Участник: эта роль предназначена для настройки ресурсов в учетной записи хранения, таких как очереди и подписки на события.
- Участник данных очереди хранилища: эта роль предназначена для выполнения операций с очередями, таких как извлечение и удаление сообщений из очередей. Эта роль требуется только при предоставлении субъекта-службы без строка подключения.
Назначьте этому соединителю доступа следующую роль в связанной группе ресурсов:
- Участник подписки на Azure Event Grid. Эта роль предназначена для выполнения операций подписки Azure Event Grid, таких как создание или перечисление подписок на события.
Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.
Использование настраиваемой роли
Если вы обеспокоены избыточными разрешениями, необходимыми для предыдущих ролей, можно создать пользовательскую роль по крайней мере со следующими разрешениями, перечисленными ниже в формате JSON роли Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Затем вы можете назначить эту настраиваемую роль соединителю доступа.
Дополнительные сведения см. в разделе Назначение ролей Azure с помощью портала Azure.
Необходимые разрешения для настройки уведомлений о файлах для Amazon S3
Необходимо иметь разрешения на чтение для входного каталога. Дополнительные сведения см. в сведениях о подключении S3.
Чтобы использовать режим уведомления о файлах, подключите следующий JSON-документ политики к пользователю или роли IAM. Эта роль IAM необходима для создания учетных данных службы, которые "Auto Loader" будет использовать для аутентификации.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
где:
-
<bucket-name>
: имя контейнера S3, из которого поток будет считывать файлы, напримерauto-logs
. Можно использовать*
в качестве подстановочного знака, напримерdatabricks-*-logs
. Чтобы узнать базовый контейнер S3 для пути к DBFS, можно перечислить все точки подключения DBFS в записной книжке, выполнив%fs mounts
. -
<region>
: регион AWS, в котором находится контейнер S3, например,us-west-2
. Если вы не хотите указать регион, используйте*
. -
<account-number>
: номер учетной записи AWS, которой принадлежит контейнер S3, например123456789012
. Если не хотите указывать номер учетной записи, используйте*
.
Строка databricks-auto-ingest-*
в спецификации SQS и SNS ARN — это префикс имени, который источник cloudFiles
использует при создании служб SQS и SNS. Так как Azure Databricks настраивает службы уведомлений при первоначальном запуске потока, после первоначального запуска (например, поток был остановлен, а затем перезапущен) вы можете использовать политику с ограниченными разрешениями.
Примечание.
Описанная выше политика связана только с разрешениями, необходимыми для настройки служб уведомлений о файлах, а именно служб уведомлений контейнера S3, SNS и SQS, и предполагается, что у вас уже есть доступ на чтение к контейнеру S3. Если вам нужно добавить разрешения только для чтения S3, добавьте следующее в список Action
в инструкции DatabricksAutoLoaderSetup
в документе JSON:
s3:ListBucket
s3:GetObject
Ограниченные разрешения после начальной настройки
Описанные выше разрешения на настройку ресурсов необходимы только во время начального запуска потока. После первого запуска можно переключиться на следующую политику IAM с ограниченными разрешениями.
Внимание
С ограниченными разрешениями вы не сможете запускать новые запросы потоковой передачи или повторно создавать ресурсы в случае сбоев (например, очередь SQS была случайно удалена); Вы также не можете использовать API управления облачными ресурсами для перечисления или отмены ресурсов.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": ["arn:aws:s3:::<bucket-name>"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
"Resource": ["arn:aws:s3:::<bucket-name>/*"]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Необходимые разрешения для настройки уведомлений о файлах для GCS
Необходимо иметь разрешения list
и get
для контейнера GCS и всех объектов. Дополнительные сведения см. в документации Google по разрешениям IAM.
Чтобы использовать режим уведомлений о файлах, необходимо добавить разрешения для учетной записи службы GCS и учетной записи службы, используемой для доступа к ресурсам Google Cloud Pub/Sub.
Добавьте роль Pub/Sub Publisher
в учетную запись службы GCS. Это позволит учетной записи публиковать уведомительные сообщения о событиях из контейнеров GCS в службе публикации и подписки Google Cloud.
Что касается учетной записи службы, используемой для ресурсов Google Cloud Pub/Sub, необходимо добавить следующие разрешения. Эта учетная запись службы автоматически создается при создании учетных данных службы для Databricks. Поддержка учетных данных службы доступна в Databricks Runtime 16.1 и выше.
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
Для этого можно создать настраиваемую роль IAM с этими разрешениями или назначить уже существующие роли GCP, чтобы обеспечить эти разрешения.
Поиск учетной записи службы GCS
В консоли Google Cloud для соответствующего проекта перейдите к Cloud Storage > Settings
.
Раздел "Учетная запись службы облачного хранения" содержит адрес электронной почты учетной записи службы GCS.
Создание настраиваемой роли IAM Google Cloud для режима уведомлений о файлах
В консоли Google Cloud для соответствующего проекта перейдите к IAM & Admin > Roles
. Затем создайте роль в верхней части или обновите существующую роль. На экране для создания или редактирования ролей щелкните Add Permissions
. Появится меню, с помощью которого вы можете добавить требуемые разрешения для роли.
Настройка ресурсов уведомлений файлов или управление ими вручную
Привилегированные пользователи могут вручную настраивать ресурсы уведомлений о файлах или управлять ими.
- Настройте службы уведомлений файлов вручную через поставщика облачных служб и вручную укажите идентификатор очереди. Дополнительные сведения см. в разделе Параметры уведомлений о файлах.
- Используйте API Scala для создания уведомлений и служб очереди или управления ими, как показано в следующем примере:
Примечание.
Необходимо иметь соответствующие разрешения для настройки или изменения облачной инфраструктуры. См. документацию по разрешениям для Azure, S3 или GCS.
Питон
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.create()
# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("cloudFiles.awsAccessKey", <aws-access-key>) \
.option("cloudFiles.awsSecretKey", <aws-secret-key>) \
.option("cloudFiles.roleArn", <role-arn>) \
.option("cloudFiles.roleExternalId", <role-external-id>) \
.option("cloudFiles.roleSessionName", <role-session-name>) \
.option("cloudFiles.stsEndpoint", <sts-endpoint>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("cloudFiles.client", <client-id>) \
.option("cloudFiles.clientEmail", <client-email>) \
.option("cloudFiles.privateKey", <private-key>) \
.option("cloudFiles.privateKeyId", <private-key-id>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
язык программирования Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
/**
* Using a Databricks service credential
*/
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("databricks.serviceCredential", <service-credential-name>)
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
/**
* Using AWS access key and secret key
*/
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>)
.option("cloudFiles.awsAccessKey", <aws-access-key>)
.option("cloudFiles.awsSecretKey", <aws-secret-key>)
.option("cloudFiles.roleArn", <role-arn>)
.option("cloudFiles.roleExternalId", <role-external-id>)
.option("cloudFiles.roleSessionName", <role-session-name>)
.option("cloudFiles.stsEndpoint", <sts-endpoint>)
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
/**
* Using a Databricks service credential
*/
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("databricks.serviceCredential", <service-credential-name>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
/**
* Using an Azure service principal
*/
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
/**
* Using a Databricks service credential
*/
val manager = CloudFilesGCPResourceManager
.newManager
.option("cloudFiles.projectId", <project-id>)
.option("databricks.serviceCredential", <service-credential-name>)
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
/**
* Using a Google service account
*/
val manager = CloudFilesGCPResourceManager
.newManager
.option("cloudFiles.projectId", <project-id>)
.option("cloudFiles.client", <client-id>)
.option("cloudFiles.clientEmail", <client-email>)
.option("cloudFiles.privateKey", <private-key>)
.option("cloudFiles.privateKeyId", <private-key-id>)
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Используйте setUpNotificationServices(<resource-suffix>)
для создания очереди и подписки с именем <prefix>-<resource-suffix>
(префикс зависит от системы хранения, описанной в облачных ресурсах, используемых в устаревшем режиме уведомлений файла автозагрузчика). Если ресурс с таким же именем уже существует, Azure Databricks будет повторно использовать уже существующий ресурс вместо того, чтобы создавать новый. Эта функция возвращает идентификатор очереди, который можно передать источнику cloudFiles
, используя идентификатор в параметрах уведомления файла. Это позволяет исходному пользователю cloudFiles
иметь меньше разрешений, чем пользователю, который создает ресурсы.
Укажите для параметра "path"
значение newManager
только при вызове setUpNotificationServices
; это не требуется для listNotificationServices
или tearDownNotificationServices
. Это тот же путь path
, который вы используете при выполнении запроса потоковой передачи.
В следующей матрице указывается, какие методы API поддерживаются в среде выполнения Databricks для каждого типа хранилища:
Облачное хранилище | API установки | API списка | Уничтожение API |
---|---|---|---|
Amazon S3 | Все версии | Все версии | Все версии |
ADLS | Все версии | Все версии | Все версии |
Глобальная система связи (if "GCS" stands for "Global Communication System") | Databricks Runtime 9.1 и выше | Databricks Runtime 9.1 и выше | Databricks Runtime 9.1 и выше |
Хранилище BLOB-объектов Azure | Все версии | Все версии | Все версии |
Очистка ресурсов уведомлений о событиях, созданных автозагрузчиком
Автоматический загрузчик не удаляет ресурсы уведомлений о файлах. Чтобы отключить ресурсы уведомлений о файлах, необходимо использовать диспетчер облачных ресурсов, как показано в предыдущем разделе. Эти ресурсы также можно удалить вручную с помощью пользовательского интерфейса или API поставщика облачных служб.
Устранение распространенных ошибок
В этом разделе описываются распространенные ошибки при использовании автозагрузчика с режимом уведомлений о файлах и их устранении.
Не удалось создать подписку сетки событий
Если при первом запуске автозагрузчика отображается следующее сообщение об ошибке, сетка событий не зарегистрирована в качестве поставщика ресурсов в подписке Azure.
java.lang.RuntimeException: Failed to create event grid subscription.
Чтобы зарегистрировать Сетку событий в качестве поставщика ресурсов, сделайте следующее:
- На портале Azure перейдите к подписке.
- Кликните "Поставщики ресурсов" в разделе "Параметры".
- Зарегистрируйте поставщик
Microsoft.EventGrid
.
Авторизация, необходимая для выполнения операций подписки сетки событий
Если при первом запуске автозагрузчика отображается следующее сообщение об ошибке, убедитесь, что роль участника назначена субъекту-службе для сетки событий и учетной записи хранения.
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Клиент Event Grid обходит прокси-сервер
В Databricks Runtime 15.2 и более поздних версиях подключения Event Grid в Auto Loader по умолчанию используют параметры прокси-сервера из системных свойств. В Databricks Runtime 13.3 LTS, 14.3 LTS и с 15.0 по 15.2 можно вручную настроить подключения Event Grid для использования прокси-сервера, задав свойство Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true
. См. статью Настройка свойств конфигурации Spark в Azure Databricks.