Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Область применения: Databricks SQL
Databricks Runtime 14.1 и выше
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Возвращает таблицу с записями, считываемыми из Pulsar.
Эта табличная функция поддерживает только потоковые запросы и не поддерживает пакетные запросы.
Синтаксис
read_pulsar ( { option_key => option_value } [, ...] )
Аргументы
Для этой функции требуется использование именованных параметров при вызове ключей опций.
Параметры serviceUrl
и topic
обязательны.
Ниже приведены краткие описания аргументов. См. документацию по структуированному потоковому вещанию Pulsar для получения более подробных описаний.
Вариант | Тип | По умолчанию. | Описание |
---|---|---|---|
serviceUrl | СТРОКА | Обязательно | URI службы Pulsar. |
тема | STRING | Обязательно | Тема, из которой нужно прочитать. |
предопределённая подписка | STRING | нет | Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark. |
subscriptionPrefix | STRING | нет | Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark. |
pollTimeoutMs | ДЛИННЫЙ | 120000 | Время ожидания для чтения сообщений из Pulsar в миллисекундах. |
ошибкаПриПотереДанных | Булевый | Истина | Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения). |
startingOffsets | STRING | новейший | Начальная точка при запуске запроса ( самая ранняя, последняя или строка JSON), указывающая определенное смещение. Если используется последняя версия, ридер считывает новейшие записи после его запуска. Если это самое раннее, читатель считывает с самого раннего оффсета. Пользователь также может указать строку JSON, указывающую определенное смещение. |
время начала | STRING | нет | При указании, источник Pulsar будет считывать сообщения, начиная с указанной позиции начального времени. |
Следующие аргументы используются для проверки подлинности клиента pulsar:
Вариант | Тип | По умолчанию. | Описание |
---|---|---|---|
pulsarClientAuthPluginClassName | строка | нет | Имя подключаемого модуля аутентификации. |
pulsarClientAuthParams | STRING | нет | Параметры плагина аутентификации. |
pulsarClientUseKeyStoreTls | СТРОКА | нет | Следует ли использовать KeyStore для проверки подлинности tls. |
pulsarClientTlsTrustStoreType | СТРОКА | нет | Тип файла TrustStore для аутентификации TLS. |
pulsarClientTlsTrustStorePath (путь к хранилищу доверия для Tls клиента Pulsar) | СТРОКА | нет | Путь к файлу TrustStore для аутентификации TLS. |
pulsarClientTlsTrustStorePassword | STRING | нет | Пароль TrustStore для проверки подлинности tls. |
Эти аргументы используются для настройки и проверки подлинности системы управления доступом пульсара. Конфигурация администратора pulsar требуется только в том случае, если включено управление доступом (когда задан параметр maxBytesPerTrigger).
Вариант | Тип | По умолчанию. | Описание |
---|---|---|---|
максБайтНаТриггер | BIGINT | нет | Мягкое ограничение на максимальное количество байт, которые мы хотим обработать на микробатч. Если это указано, необходимо также указать admin.url. |
adminUrl | STRING | нет | Конфигурация serviceHttpUrl для Pulsar. Требуется только при указании maxBytesPerTrigger. |
pulsarAdminAuthPlugin | STRING | нет | Имя плагина аутентификации. |
pulsarAdminAuthParams | STRING | нет | Параметры плагина аутентификации. |
pulsarClientUseKeyStoreTls | STRING | нет | Следует ли использовать KeyStore для проверки подлинности tls. |
параметр типа хранилища доверия для pulsarAdminTls | STRING | нет | Тип файла TrustStore для аутентификации TLS. |
pulsarAdminTlsTrustStorePath | STRING | нет | Путь к файлу TrustStore для проверки подлинности TLS. |
pulsarAdminTlsTrustStorePassword | STRING | нет | Пароль TrustStore для проверки подлинности tls. |
Возвраты
Таблица записей пульсара со следующей схемой.
__key STRING NOT NULL
: ключ сообщения Pulsar.value BINARY NOT NULL
: значение сообщения Pulsar.Примечание. Для разделов с схемой Avro или JSON вместо загрузки содержимого в поле двоичного значения содержимое будет развернуто, чтобы сохранить имена полей и типы полей раздела Pulsar.
__topic STRING NOT NULL
: имя раздела Pulsar.__messageId BINARY NOT NULL
: идентификатор сообщения Pulsar.__publishTime TIMESTAMP NOT NULL
: время публикации сообщения Pulsar.__eventTime TIMESTAMP NOT NULL
: время события сообщения Pulsar.__messageProperties MAP<STRING, STRING>
: параметры сообщения Pulsar.
Примеры
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.