Поделиться через


read_kinesis функция потоковой передачи табличного значения

Область применения:флажок Databricks SQL флажок Databricks Runtime 13.3 LTS и выше

Возвращает таблицу с записями, считываемыми из Kinesis из одного или нескольких потоков.

Синтаксис

read_kinesis ( { parameter => value } [, ...] )

Аргументы

read_kinesis требует вызова именованных параметров.

Единственным обязательным аргументом является streamName. Все остальные аргументы являются необязательными.

Ниже приведены краткие описания аргументов. Дополнительные сведения см. в документации Amazon Kinesis .

Существует несколько способов подключения и проверки подлинности с помощью AWS. Рекомендуемый подход — создать учетные данные службы Databricks и указать их, используя опцию serviceCredential. Кроме того, можно выполнить проверку подлинности с помощью awsAccessKey и awsSecretKey. Эти параметры можно указать в аргументах функции с помощью secret функции, вручную задать в аргументах или настроить в качестве переменных среды, как указано ниже. roleArn, roleExternalID, roleSessionName также можно использовать для аутентификации в AWS с помощью профилей экземпляров. Если ни одно из них не указано, оно будет использовать цепочку поставщиков AWS по умолчанию.

Параметр Тип Описание
streamName STRING Обязательный список, разделенный запятыми, из одного или нескольких потоков Kinesis.
serviceCredential STRING Имя учетных данных сервиса Databricks.
awsAccessKey STRING Ключ доступа AWS, если таковой есть. Можно также указать с помощью различных параметров, поддерживаемых цепочкой поставщиков учетных данных по умолчанию AWS, включая переменные среды (AWS_ACCESS_KEY_ID) и файл профилей учетных данных.
awsSecretKey STRING Секретный ключ, соответствующий ключу доступа. Можно указать в аргументах или с помощью различных параметров, поддерживаемых цепочкой поставщиков учетных данных AWS по умолчанию, включая переменные среды (AWS_SECRET_KEY или AWS_SECRET_ACCESS_KEY) и файл профилей учетных данных.
roleArn STRING Имя ресурса Amazon для предполагаемой роли при доступе к Kinesis.
roleExternalId STRING Используется при делегировании доступа к учетной записи AWS.
roleSessionName STRING Имя сеанса роли AWS.
stsEndpoint STRING Конечная точка для запроса учетных данных временного доступа.
region STRING Регион для потоков, которые нужно указать. По умолчанию используется локально разрешенный регион.
endpoint STRING региональная конечная точка для потоков данных Kinesis. По умолчанию используется локально разрешенный регион.
initialPosition STRING Начальная позиция для чтения из потока. Один из следующих: "последний" (по умолчанию), "trim_horizon", "самый ранний", "at_timestamp".
consumerMode STRING Одно из следующих: "polling" (опрос) (по умолчанию) или "EFO" (режим расширенной рассылки).
consumerName STRING Имя потребителя. Все потребители имеют префикс "databricks_". Значением по умолчанию является пустая строка.
registerConsumerTimeoutInterval STRING Максимальное время ожидания для регистрации потребителя Kinesis EFO в потоке Kinesis перед выдачей ошибки. Значение по умолчанию — 300s.
requireConsumerDeregistration BOOLEAN true для отмены регистрации потребителя EFO при завершении запроса. По умолчанию — false.
deregisterConsumerTimeoutInterval STRING Максимальное время ожидания для удаления регистрации потребителя Kinesis EFO из потока Kinesis до генерации ошибки. Значение по умолчанию — 300s.
consumerRefreshInterval STRING Интервал, с периодичностью которого проверяется и обновляется клиент. Значение по умолчанию — 300s.

Следующие аргументы используются для управления пропускной способностью чтения и задержкой для Kinesis:

Параметр Тип Описание
maxRecordsPerFetch INTEGER (>0) Необязательный параметр, по умолчанию для чтения устанавливается 10 000 записей на каждый запрос API в Kinesis.
maxFetchRate STRING Скорость предварительной выборки данных на сегмент. Значение между "1.0" и "2.0", которое измеряется в МБ/с. Значение по умолчанию — 1.0.
minFetchPeriod STRING Максимальное время ожидания между последовательными попытками предварительной выборки. Значение по умолчанию — 400 мс.
maxFetchDuration STRING Максимальная длительность буферизации предварительно подготовленных новых данных. Значение по умолчанию — 10s.
fetchBufferSize STRING Объем данных для следующего триггера. Значение по умолчанию — 20 гб.
shardsPerTask INTEGER (>0) Количество сегментов Kinesis для предварительной загрузки параллельно каждой задаче Spark. Значение по умолчанию равно 5.
shardFetchinterval STRING Как часто опрашивать для изменения сегментирования. Значение по умолчанию — "1s".
coalesceThresholdBlockSize INTEGER (>0) Пороговое значение, при котором происходит автоматическое объединение. Значение по умолчанию — 10 000 000.
coalesce BOOLEAN true для объединения предварительно подготовленных запросов. Значение по умолчанию — true.
coalesceBinSize INTEGER (>0) Приблизительный размер блока после объединения. Значение по умолчанию — 128 000 000.
reuseKinesisClient BOOLEAN true для повторного использования клиента Kinesis, хранящегося в кэше. Значение по умолчанию — true, за исключением кластера PE.
clientRetries INTEGER (>0) Количество повторов в сценарии повторных попыток. Значение по умолчанию равно 5.

Возвраты

Таблица записей Kinesis со следующей схемой:

Имя. Тип данных Допускает значение NULL Стандарт Описание
partitionKey STRING Нет Ключ, используемый для распределения данных между сегментами потока. Все записи данных с тем же ключом секции будут считываться из одной шарды.
data BINARY Нет Полезная нагрузка Kinesis, закодированная в формате Base-64.
stream STRING Нет Имя потока, из которого считывались данные.
shardId STRING Нет Уникальный идентификатор сегмента, из которого были считываются данные.
sequenceNumber BIGINT Нет Уникальный идентификатор записи в его сегменте.
approximateArrivalTimestamp TIMESTAMP Нет Приблизительное время вставки записи в поток.

Столбцы (stream, shardId, sequenceNumber) представляют собой первичный ключ.

Примеры

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret('test-databricks', 'awsAccessKey'),
        awsSecretKey => secret('test-databricks', 'awsSecretKey'),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => '[email protected]');