Подключение к Apache Kafka

В этой статье описывается, как использовать Apache Kafka в качестве источника или приемника при выполнении рабочих нагрузок структурированной потоковой передачи в Azure Databricks.

Дополнительные сведения о Kafka см. в документации по Apache Kafka.

Чтение данных из Kafka

Azure Databricks предоставляет ключевое слово kafka в качестве формата данных для настройки подключений к Kafka. Ниже приведен пример потокового чтения:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks также поддерживает семантику пакетного чтения, как показано в следующем примере:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

Для добавочной пакетной загрузки Databricks рекомендует использовать Kafka с Trigger.AvailableNow. См.: AvailableNow инкрементную пакетную обработку.

В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks также предоставляет функцию SQL для чтения данных Kafka. Потоковая передача с помощью SQL поддерживается только в декларативных конвейерах Spark Lakeflow или с таблицами потоковой передачи в Databricks SQL. См. табличную функцию read_kafka.

Настройка средства чтения структурированной потоковой передачи Kafka

Для источника Kafka необходимо задать следующий параметр для пакетных и потоковых запросов:

Опция Ценность Описание
kafka.bootstrap.servers Разделенный запятыми список host:port Серверы начальной загрузки кластера Kafka

Кроме того, для указания разделов, на которые нужно подписаться, необходимо указать один из следующих вариантов:

Опция Ценность Описание
subscribe Разделенный запятыми список разделов. Список разделов, на который нужно подписаться.
subscribePattern Java строке regex. Шаблон, используемый для подписки на разделы.
assign Строка в формате JSON {"topicA":[0,1],"topic":[2,4]}. Зависит от topicPartitions использования.

Полный список доступных параметров см. на странице "Параметры ".

Схема записей Kafka

Записи, возвращаемые средством чтения структурированной потоковой передачи Kafka, будут иметь следующую схему:

колонна Тип
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

key и value всегда десериализуются как массивы байтов с ByteArrayDeserializer. Используйте операции DataFrame (например cast("string") , или from_avro) для явной десериализации ключей и значений.

Запись данных в Kafka

Ниже приведен пример потоковой записи в Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks также поддерживает семантику пакетной записи в приемники данных Kafka, как показано в следующем примере:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Настройка модуля записи структурированной потоковой передачи Kafka

Это важно

Databricks Runtime 13.3 LTS и выше содержит более новую версию библиотеки kafka-clients, которая обеспечивает возможность идемпотентной записи по умолчанию. Если приемник Kafka использует версию 2.8.0 или ниже с настроенными ACL, но без включенного IDEMPOTENT_WRITE, запись завершается ошибкой org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Чтобы устранить эту ошибку, обновите до версии Kafka 2.8.0 или более поздней, или установите параметр .option(“kafka.enable.idempotence”, “false”) при настройке писателя структурированной потоковой передачи.

Ниже приведены распространенные параметры при записи в Kafka:

Опция Ценность Значение по умолчанию Описание
kafka.boostrap.servers Список <host:port>, разделенный запятыми нет [Обязательно] Конфигурация Kafka bootstrap.servers.
topic STRING не задано [Необязательно] Устанавливает тему для записи всех строк. Этот параметр переопределяет любой столбец темы, имеющийся в данных.
includeHeaders BOOLEAN false [Необязательно] Следует ли включать заголовки Kafka в строку.

Полный список доступных параметров см. на странице "Параметры ".

Схема модуля записи Kafka

При записи данных в Kafka указанный кадр данных может содержать следующие поля:

Имя столбца Обязательно или необязательно Тип
key optional STRING или BINARY
value required STRING или BINARY
headers optional ARRAY
topic необязательный (игнорируется, если topic задан в качестве опции записи) STRING
partition optional INT

Проверка подлинности

Azure Databricks поддерживает несколько методов проверки подлинности для Kafka, включая учетные данные службы каталога Unity, SASL/SSL и облачные варианты для AWS MSK, Azure Event Hubs и Google Cloud Managed Kafka. См. раздел Аутентификация.

Получение метрик Kafka

Вы можете отслеживать, насколько потоковый запрос отстает от Kafka, используя метрики avgOffsetsBehindLatest, maxOffsetsBehindLatest и minOffsetsBehindLatest. Эти отчеты представляют среднюю, максимальную и минимальную задержку относительно смещений по всем разделам подписанных тем относительно новейших смещений в Kafka. См. статью Чтение метрик в интерактивном режиме.

Замечание

В Databricks Runtime 17.1 и более поздних версиях последние смещения Kafka извлекаются после завершения каждого микропакета. В разделах, которые постоянно получают данные, метрики невыполненной работы могут отображать небольшие постоянные ненулевых значения. Это ожидаемое поведение и не указывает на то, что поток отстает.

В Databricks Runtime 17.0 и ниже последние смещения Kafka извлекаются в момент начала микропакета. Отложенные метрики могут возвращать 0, когда потоковые запросы последовательно потребляют все записи, доступные в начале выполнения микропакета.

Чтобы оценить объем данных, которые запрос еще не использовал, используйте метрику estimatedTotalBytesBehindLatest . Эта метрика оценивает общее количество байтов, оставшихся во всех подписанных секциях на основе пакетов, обработанных за последние 300 секунд. Вы можете изменить период времени, используемый для этой оценки, задав bytesEstimateWindowLength параметр. Например, чтобы задать для него значение 10 минут:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

Дополнительные сведения см. в Мониторинг структурированных запросов потоковой передачи Azure Databricks.

Пример кода: Kafka to Delta

В следующем примере показан полный рабочий процесс для непрерывной потоковой передачи данных из Kafka в таблицу Delta. Этот шаблон идеально подходит для рабочих нагрузок приема данных практически в режиме реального времени.

В этом примере используется фиксированная схема JSON. Для других форматов, таких как Avro или Protobuf, используйте from_avro или from_protobuf. Вы также можете интегрироваться с реестром схем. См . пример с реестром схем.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);