Поделиться через


read_kafka табличное значение функции

Область применения:флажок Databricks SQL флажок Databricks Runtime 13.3 LTS и выше

Считывает данные из кластера Apache Kafka и возвращает данные в табличной форме.

Может считывать данные из одной или нескольких тем Kafka. Он поддерживает как пакетные запросы, так и прием потоковой передачи.

Синтаксис

read_kafka([option_key => option_value ] [, ...])

Аргументы

Для этой функции требуется вызов именованного параметра.

  • option_key: имя параметра для настройки. Необходимо использовать обратные кавычки () for options that contain dots (.`).
  • option_value: константное выражение для задания параметра. Принимает литералы и скалярные функции.

Возвраты

Записи считываются из кластера Apache Kafka со следующей схемой:

  • key BINARY: ключ для записи Kafka.
  • value BINARY NOT NULL: значение записи Kafka.
  • topic STRING NOT NULL: имя топика Kafka, из которого считывается запись.
  • partition INT NOT NULL: идентификатор раздела Kafka, из которого считывается запись.
  • offset BIGINT NOT NULL: номер смещения записи в TopicPartitionKafka.
  • timestamp TIMESTAMP NOT NULL: значение метки времени для записи. Столбец timestampType определяет, что соответствует этой метке времени.
  • timestampType INTEGER NOT NULL: тип метки времени, указанной в столбце timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: значения заголовков, предоставленные как часть записи (если включено).

Примеры

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Параметры

Подробный список параметров можно найти в документации по Apache Spark .

Обязательные параметры

Укажите приведенный ниже вариант для подключения к кластеру Kafka.

Вариант
bootstrapServers
Тип: String
Список пар узлов и портов, разделенных запятыми, указывающих на кластер Kafka.
Значение по умолчанию: нет

Укажите только один из приведенных ниже вариантов, чтобы настроить, какие разделы Kafka будут извлекать данные из.

Вариант
assign
Тип: String
Строка JSON, содержащая определенные секции раздела для использования. Например, для '{"topicA":[0,1],"topicB":[2,4]}' будут использоваться разделы 0-й и 1-й секции topicA.
Значение по умолчанию: нет
subscribe
Тип: String
Разделенный запятыми список разделов Kafka для чтения.
Значение по умолчанию: нет
subscribePattern
Тип: String
Регулярное выражение, соответствующее темам для подписки.
Значение по умолчанию: нет

Прочие параметры

read_kafka можно использовать в пакетных запросах и в потоковых запросах. Приведенные ниже параметры указывают тип запроса, к которому они применяются.

Вариант
endingOffsets
Тип: String Тип запроса: только пакет
Смещения для чтения до определённой точки в пакетном запросе: либо "latest" для указания последних записей, либо строка JSON, задающая конечное смещение для каждого TopicPartition. В формате JSON можно использовать -1 в качестве смещения для ссылки на последнее. -2 (самый ранний) в качестве смещения не допускается.
Значение по умолчанию: "latest"
endingOffsetsByTimestamp
Тип: String Тип запроса: только пакет
Строка JSON, указывающая конечную временную метку для чтения для каждого TopicPartition. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC, например,
1686444353000. Дополнительные сведения о поведении с метками времени смотрите в заметке ниже.
endingOffsetsByTimestamp имеет больший приоритет, чем endingOffsets.
Значение по умолчанию: нет
endingTimestamp
Тип: String Тип запроса: только пакет
Строковое значение метки времени в миллисекундах с тех пор
1970-01-01 00:00:00 UTCнапример "1686444353000". Если Kafka не возвращает соответствующее смещение, то для смещения будет задано значение latest. Дополнительные сведения о поведении с метками времени смотрите в заметке ниже. Примечание. endingTimestamp Имеет приоритет над endingOffsetsByTimestamp и
endingOffsets.
Значение по умолчанию: нет
includeHeaders
Тип запроса: Boolean потоковый и пакетный
Следует ли включать заголовки Kafka в строку.
Значение по умолчанию: false
kafka.<consumer_option>
Тип запроса: String потоковый и пакетный
Любые параметры для конкретного потребителя Kafka можно передать с kafka. префиксом. Эти параметры должны быть окружены обратными знаками при указании, в противном случае вы получите ошибку синтаксического анализа. Параметры можно найти в документации Kafka.
Примечание. Для этой функции не следует задавать следующие параметры:
key.deserializer value.deserializer bootstrap.servers group.id
Значение по умолчанию: нет
maxOffsetsPerTrigger
Тип: Long Запрос типа: только потоковый режим
Ограничение скорости на максимальное количество смещений или строк, обработанных за каждый интервал триггера. Указанное общее количество смещений будет пропорционально разделено по TopicPartitions.
Значение по умолчанию: нет
startingOffsets
Тип запроса: String потоковый и пакетный
Начальная точка при запуске запроса может быть либо "earliest", что начинается с самых ранних смещений, либо "latest", что используется только для последних смещений, либо строка JSON, указывающая начальное смещение для каждого TopicPartition. В формате JSON смещение -2 можно использовать для указания на самый ранний момент, а -1 — на самый последний.
Примечание. Для пакетных запросов последнее значение (неявно или с помощью -1 в JSON) не допускается. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные потоковые запросы будут начинаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные разделы во время запроса будут начинаться с самого раннего.
Значение по умолчанию: "latest" для потоковой передачи, "earliest" для пакетной обработки
startingOffsetsByTimestamp
Тип запроса: String потоковый и пакетный
Строка JSON, указывающая начальный таймстамп для каждого TopicPartition. Метки времени должны быть предоставлены в виде целого числа, представляющего метку времени в миллисекундах с момента 1970-01-01 00:00:00 UTC, например, 1686444353000. Дополнительные сведения о поведении с метками времени смотрите в заметке ниже. Если Kafka не возвращает соответствующее смещение, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp имеет больший приоритет, чем startingOffsets.
Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные потоковые запросы будут начинаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные разделы во время запроса будут начинаться с самого раннего.
Значение по умолчанию: нет
startingOffsetsByTimestampStrategy
Тип запроса: String потоковый и пакетный
Эта стратегия используется, если начальное смещение, определённое по временной метке (глобальной или привязанной к разделу), не соответствует смещению, которое возвращает Kafka. Доступные стратегии:
  • "error": сбой запроса
  • "latest": назначает последнее смещение для этих секций, чтобы Spark могли считывать новые записи из этих разделов в последующих микропакетах.

Значение по умолчанию: "error"
startingTimestamp
Тип запроса: String потоковый и пакетный
Строковое значение метки времени в миллисекундах с тех пор
1970-01-01 00:00:00 UTCнапример "1686444353000". Дополнительные сведения о поведении с метками времени смотрите в заметке ниже. Если Kafka не возвращает соответствующее смещение, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy.
startingTimestamp имеет приоритет над startingOffsetsByTimestamp и startingOffsets.
Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные потоковые запросы будут начинаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные разделы во время запроса начнутся первыми.
Значение по умолчанию: нет

Примечание.

Возвращаемое смещение для каждой секции является самым ранним смещением, метка времени которого больше или равна заданной метке времени в соответствующей секции. Поведение может варьироваться в зависимости от опций, если Kafka не возвращает совпадающее смещение — проверьте описание каждой опции.

Spark просто передает информацию о временной метке KafkaConsumer.offsetsForTimes и не интерпретирует или осмысливает значение. Дополнительные сведения KafkaConsumer.offsetsForTimesсм. в документации. Кроме того, значение метки времени здесь может отличаться в зависимости от конфигурации Kafka (log.message.timestamp.type). Дополнительные сведения см. в документации по Apache Kafka.