Поделиться через


read_pubsub функция потоковой передачи табличного значения

Область применения:флажок Databricks SQL флажок Databricks Runtime 13.3 LTS и выше

Возвращает таблицу с записями, считываемыми из pub/Sub из раздела. Поддерживает только потоковые запросы.

Синтаксис

read_pubsub( { parameter => value } [, ...])

Аргументы

read_pubsub требует именованного вызова параметров.

Единственными обязательными аргументами являются subscriptionId, projectIdи topicId. Все остальные аргументы являются необязательными.

Для получения полных описаний аргументов см. раздел "Настройка параметров Pub/Sub потоковой передачи".

Databricks рекомендует использовать секреты при предоставлении параметров авторизации. См. secret функцию.

Дополнительные сведения о настройке доступа к Pub/Sub см. в разделе "Настройка доступа к Pub/Sub".

Параметр Тип Описание
subscriptionId STRING Обязательный уникальный идентификатор, назначенный подписке Pub/Sub.
projectId STRING Обязательный идентификатор проекта Google Cloud, связанный с разделом Pub/Sub.
topicId STRING Обязательный идентификатор или имя раздела Pub/Sub для подписки.
clientEmail STRING Адрес электронной почты, связанный с учетной записью службы для проверки подлинности.
clientId STRING Идентификатор клиента, связанный с учетной записью службы для проверки подлинности.
privateKeyId STRING Идентификатор закрытого ключа, связанного с учетной записью службы.
privateKey STRING Закрытый ключ, связанный с учетной записью службы для проверки подлинности.

Эти аргументы используются для дальнейшей точной настройки при чтении из Pub/Sub:

Параметр Тип Описание
numFetchPartitions STRING Необязательный параметр с числом исполнителей по умолчанию. Количество параллельных задач Spark, которые извлекают записи из подписки.
deleteSubscriptionOnStreamStop BOOLEAN Необязательный параметр по умолчанию false. Если установлено значение true, подписка, переданная в поток, удаляется при завершении стриминговой задачи.
maxBytesPerTrigger STRING Плавающее ограничение размера пакета для обработки во время каждой запускаемой микро-выборки. Значение по умолчанию — none.
maxRecordsPerFetch STRING Количество записей, которые нужно получить для каждой задачи до их обработки. Значение по умолчанию — 1000.
maxFetchPeriod STRING Длительность для получения данных для каждой задачи перед обработкой записей. Значение по умолчанию — 10s.

Возвраты

Таблица записей Pub/Sub со следующей схемой. Столбец атрибутов может иметь значение NULL, но все остальные столбцы не имеют значения NULL.

Имя. Тип данных Допускает значение NULL Стандарт Описание
messageId STRING нет Уникальный идентификатор сообщения Pub/Sub.
payload BINARY нет Содержимое сообщения Pub/Sub.
attributes STRING Да Пары "ключ-значение", представляющие атрибуты сообщения Pub/Sub. Это строка, закодированная в формате JSON.
publishTimestampInMillis BIGINT нет Метка времени публикации сообщения в миллисекундах.
sequenceNumber BIGINT нет Уникальный идентификатор записи в своем сегменте.

Примеры

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                clientEmail => secret('app-events', 'clientEmail'),
                clientId => secret('app-events', 'clientId'),
        privateKeyId => secret('app-events', 'privateKeyId'),
                privateKey => secret('app-events', 'privateKey')
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic'
);

Теперь необходимо запросить данные из testing.streaming_table для дальнейшего анализа.

Ошибочные запросы:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project'
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => 'app-events-1234',
                projectId => 'app-events-project',
                topicId => 'app-events-topic',
                maxRecordsPerFetchLimit => '1000001'
);