Options

На этой странице описаны параметры конфигурации для чтения и записи в Apache Kafka с помощью структурированной потоковой передачи в Azure Databricks.

Соединитель Azure Databricks Kafka построен на основе соединителя Apache Spark Kafka и поддерживает все стандартные параметры конфигурации Kafka. Любой параметр, префиксный с kafka. помощью, передается непосредственно в базовый клиент Kafka. Например, .option("kafka.max.poll.records", "500") задает свойство потребителя max.poll.records Kafka. Полный список доступных свойств Kafka см. в документации по конфигурации Kafka .

Дополнительные параметры источника и приемника структурированной потоковой передачи, не перечисленные на этой странице, см. в руководстве по интеграции структурированной потоковой передачи и Kafka.

Обязательные параметры

Для чтения и записи требуется следующий параметр:

Опция Ценность Описание
kafka.bootstrap.servers Разделенный запятыми список host:port Конфигурация Kafka bootstrap.servers . Если данные из Kafka отсутствуют, сначала проверьте список адресов брокера. Если список адресов брокера неверный, ошибки могут не возникнуть. Это обусловлено тем, что клиент Kafka предполагает, что брокеры в конечном счете станут доступны, и в случае ошибок сети будет повторять попытки бесконечно.

При чтении из Kafka необходимо также указать один из следующих вариантов, чтобы определить, какие разделы следует использовать:

Опция Ценность Описание
subscribe Разделенный запятыми список разделов Список разделов, на который нужно подписаться.
subscribePattern Строка регулярных выражений Java Шаблон, используемый для подписки на разделы.
assign Строка JSON {"topicA":[0,1],"topicB":[2,4]} Секции конкретных разделов для использования.

При записи в Kafka можно при необходимости задать topic параметр, чтобы указать раздел назначения для всех строк. Если параметр не задан, кадр данных должен содержать topic столбец.

Распространенные параметры чтения

При чтении из Kafka часто используются следующие параметры:

Опция Ценность По умолчанию Описание
minPartitions INT нет Минимальное количество секций для чтения из Kafka. Как правило, Spark создает одну секцию на раздел раздела Kafka. Установка этого более высокого уровня разделяет большие секции Kafka на небольшие секции Spark для повышения параллелизма. Полезно для обработки отклонений данных или пиковых нагрузок. Примечание. Включение этого повторно инициализации потребителей Kafka на каждом триггере, что может повлиять на производительность при использовании SSL.
maxRecordsPerPartition LONG нет Максимальное количество записей на секцию Spark. При установке Spark разбивает секции Kafka, поэтому каждая секция Spark имеет не больше всего этих записей. Можно использовать с minPartitions; если оба заданы, Spark использует все результаты в дополнительных разделах.
failOnDataLoss BOOLEAN true Происходит ли сбой запроса, если возможно, что данные были потеряны. Запросы могут постоянно завершаться сбоем считывания данных из Kafka во многих сценариях, таких как удаление разделов, усечение разделов перед обработкой и т. д. Мы пытаемся с применением консервативного подхода определить, могут ли данные быть потеряны. Иногда это может привести к ложным сигналам. Установите этот параметр на false, если он не работает должным образом или если вы хотите, чтобы запрос продолжался, несмотря на потерю данных.
maxOffsetsPerTrigger LONG нет [Только потоковая передача] Ограничение скорости на максимальное количество смещения, обрабатываемых на интервал триггера. Общее количество смещения пропорционально разделено по секциям раздела.
Для более расширенного управления потоком можно также использовать minOffsetsPerTrigger (минимальные смещения перед активацией) и maxTriggerDelay (максимальное время ожидания, по умолчанию 15m). Дополнительные сведения см. в руководстве по интеграции Spark Kafka .
startingOffsets earliest, latestили строка JSON latest Определяет, где начать чтение при запуске запроса. Используется earliest для чтения из самых ранних доступных смещения, latest для чтения только новых данных после запуска потока или строки JSON, чтобы указать начальное смещение для каждой секции раздела (например, {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}}). В формате JSON -2 ссылается на самые ранние и -1 последние.
Для потоковых запросов это применяется только при запуске нового запроса; возобновление всегда выбирается, откуда запрос остался. Недавно обнаруженные секции начинаются с earliest.
Примечание. Для пакетных latest запросов (неявно или с помощью -1 в JSON) запрещено. Чтобы начать с определенной метки времени, используйте startingTimestamp или startingOffsetsByTimestamp.
endingOffsets latest или строка JSON latest [Только пакетная служба] Конечная точка при завершении пакетного запроса. Используется latest для чтения до последних смещения или строки JSON, чтобы указать конечное смещение для каждой секции раздела (например, {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}}). В формате JSON -1 ссылается на последнюю версию; -2 (earlisest) не допускается. Для завершения определенной метки времени используйте endingTimestamp или endingOffsetsByTimestamp.
groupIdPrefix STRING spark-kafka-source (потоковая передача) или spark-kafka-relation (пакет) Префикс для автоматически созданного идентификатора группы потребителей. Соединитель автоматически создает уникальный group.id для каждого запроса. Этот параметр настраивает префикс созданного идентификатора. Игнорируется, если kafka.group.id задано значение.
kafka.group.id STRING нет Идентификатор группы для использования при чтении из Kafka. Следует использовать с осторожностью. По умолчанию каждый запрос формирует уникальный идентификатор группы для чтения данных. Это гарантирует, что каждый запрос будет иметь собственную группу потребителей, которая не сталкивается с помехами со стороны других потребителей и, следовательно, может считывать все разделы подписанных тем. В некоторых сценариях (например, при авторизации на основе группы Kafka) возможно, потребуется использовать определенные идентификаторы авторизованных групп для чтения данных. При необходимости можно задать идентификатор группы. Но это нужно сделать с особой осторожностью, так как это может привести к непредвиденному поведению.
  • Одновременно выполняющиеся запросы (как пакет, так и потоковая передача) с одинаковым идентификатором группы, скорее всего, влияют друг на друга, что приводит к тому, что каждый запрос считывает только часть данных.
  • Это также может произойти при запуске или перезапуске запросов в быстром последовательном порядке. Чтобы свести к минимуму такие проблемы, настройте конфигурацию потребителя Kafka (например, session.timeout.ms) на очень маленькое значение.
includeHeaders BOOLEAN false Следует ли включать заголовки сообщений Kafka в выходные данные.
bytesEstimateWindowLength STRING 300s [Только потоковая передача] Период времени, используемый для оценки оставшихся байтов с помощью estimatedTotalBytesBehindLatest метрики. Принимает строки длительности, такие как 10m (10 минут) или 600s (600 секунд). См. сведения о метриках Kafka.

Общие параметры записи

При написании в Kafka часто используются следующие параметры:

Опция Ценность По умолчанию Описание
topic STRING нет Задает раздел для всех строк. Это переопределяет любой topic столбец в данных.
includeHeaders BOOLEAN false Следует ли включать заголовки Kafka в строку.

Это важно

Databricks Runtime 13.3 LTS и выше содержит более новую версию библиотеки kafka-clients, которая обеспечивает возможность идемпотентной записи по умолчанию. Если приемник Kafka использует версию 2.8.0 или ниже с настроенными списками управления доступом, но без IDEMPOTENT_WRITE включения записи завершится ошибкой. Для этого выполните обновление до Kafka 2.8.0 или более поздней версии или задав параметр .option("kafka.enable.idempotence", "false").

Варианты проверки подлинности

Azure Databricks поддерживает несколько методов проверки подлинности для Kafka, включая учетные данные службы каталога Unity, SASL/SSL и облачные варианты для AWS MSK, Центров событий Azure и Google Cloud Managed Kafka.

Azure Databricks рекомендует использовать учетные данные службы каталога Unity для проверки подлинности в облачных службах Kafka:

Опция Ценность Описание
databricks.serviceCredential STRING Имя учетных данных службы каталога Unity для проверки подлинности в облачных службах Kafka (AWS MSK, Центрах событий Azure или Google Cloud Managed Kafka). Доступно в Databricks Runtime 16.1 и более поздних версиях.
databricks.serviceCredential.scope STRING Область OAuth для учетных данных службы. Задайте это только в том случае, если Azure Databricks не может автоматически выводить область для службы Kafka.

При использовании учетных данных службы каталога Unity не требуется указывать такие параметры SASL/SSL, как kafka.sasl.mechanism, kafka.sasl.jaas.configили kafka.security.protocol.

Распространенные параметры SASL/SSL включают:

Опция Ценность Описание
kafka.security.protocol STRING Протокол, используемый для взаимодействия с брокерами (например, SASL_SSL, SSL, PLAINTEXT).
kafka.sasl.mechanism STRING Механизм SASL (например, , PLAINSCRAM-SHA-256, SCRAM-SHA-512, , OAUTHBEARER). AWS_MSK_IAM
kafka.sasl.jaas.config STRING Строка конфигурации входа JAAS.
kafka.sasl.login.callback.handler.class STRING Полное имя класса обработчика обратного вызова для проверки подлинности SASL.
kafka.sasl.client.callback.handler.class STRING Полное имя класса обработчика обратного вызова клиента для проверки подлинности SASL.
kafka.ssl.truststore.location STRING Расположение файла хранилища доверия SSL.
kafka.ssl.truststore.password STRING Пароль для файла хранилища доверия SSL.
kafka.ssl.keystore.location STRING Расположение файла хранилища ключей SSL.
kafka.ssl.keystore.password STRING Пароль для файла хранилища ключей SSL.

Полные инструкции по настройке проверки подлинности см. в разделе "Проверка подлинности".

Дополнительные ресурсы