Поделиться через


Использование потоковых таблиц в Databricks SQL

Databricks рекомендует использовать потоковые таблицы для приема данных с помощью Databricks SQL. Потоковая таблица — это таблица , зарегистрированная в каталоге Unity с дополнительной поддержкой потоковой или добавочной обработки данных. Конвейер DLT автоматически создается для каждой потоковой таблицы. Таблицы потоковой передачи можно использовать для добавочной загрузки данных из Kafka и облачного хранилища объектов.

Замечание

Чтобы узнать, как использовать таблицы Delta Lake в качестве источников и приемников потоков, см. раздел Чтение и запись потоков в таблицы Delta.

Требования

Чтобы использовать потоковые таблицы, необходимо соответствовать следующим требованиям.

Требования к рабочей области:

Потоковые таблицы, созданные в Databricks SQL, поддерживаются бессерверным конвейером DLT. Рабочая область должна поддерживать бессерверные конвейеры для использования этой функции.

Требования к вычислениям:

Необходимо использовать один из следующих вариантов:

  • Хранилище SQL, использующее Current канал.
  • Вычисление со стандартным режимом доступа (ранее общим режимом доступа) в Databricks Runtime 13.3 LTS или более поздней версии.
  • Выполнение вычислений в выделенном режиме доступа (ранее режиме доступа с одним пользователем) в Databricks Runtime 15.4 LTS или новее.

    В Databricks Runtime 15.3 и ниже нельзя использовать выделенные вычислительные ресурсы для запроса таблиц потоковой передачи, принадлежащих другим пользователям. Вы можете использовать выделенные вычисления в среде выполнения Databricks версии 15.3 и ниже, только если вы владеете потоковой таблицей. Создатель таблицы - владелец.

    Databricks Runtime 15.4 LTS и более поздних версий поддерживают запросы к таблицам, созданным DLT для выделенных вычислений, даже если вы не являетесь владельцем таблицы. Вы можете взимать плату за бессерверные вычислительные ресурсы при использовании выделенных вычислений для выполнения операций фильтрации данных. См. детализированное управление доступом для выделенных ресурсов (ранее однопользовательских вычислительных ресурсов).

Требования к разрешениям:

  • USE CATALOG и USE SCHEMA привилегии в каталоге и схеме, в которой создается потоковая таблица.
  • Привилегия CREATE TABLE на схеме, в которой вы создаете потоковую таблицу.
  • Привилегии для доступа к таблицам или расположениям, предоставляющим исходные данные для потоковой таблицы.

Создание потоковых таблиц

Потоковая таблица определяется SQL-запросом в Databricks SQL. При создании потоковой таблицы данные из исходных таблиц используются для её формирования. После этого вы обновляете таблицу, как правило, по расписанию, чтобы извлечь все добавленные данные в исходных таблицах, чтобы добавить в потоковую таблицу.

Когда вы создаёте потоковую таблицу, вы считаетесь её владельцем.

Чтобы создать потоковую таблицу из существующей таблицы, используйте инструкциюCREATE STREAMING TABLE, как показано в следующем примере:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT product, price FROM STREAM raw_data;

В этом случае потоковая таблица sales создается из определенных raw_data столбцов таблицы с расписанием обновления каждый час. Используемый запрос должен быть потоковым запросом. Используйте ключевое слово STREAM для применения семантики потоковой передачи при чтении из источника.

При создании потоковой таблицы с помощью CREATE OR REFRESH STREAMING TABLE инструкции обновление и начальное заполнение данных начинается немедленно. Эти операции не используют вычислительные ресурсы хранилища DBSQL. Вместо этого потоковая таблица использует бессерверную DLT для создания и обновления. Выделенный бессерверный конвейер DLT автоматически создается и управляется системой для каждой потоковой таблицы.

Загрузка файлов с помощью автозагрузчика

Чтобы создать потоковую таблицу из файлов в томе, используйте автозагрузчик. Используйте автозагрузчик с DLT для большинства задач приема данных из облачного хранилища объектов. Автозагрузчик и DLT предназначены для инкрементальной и идемпотентной загрузки постоянно увеличивающихся данных по мере поступления в облачное хранилище.

Чтобы использовать автозагрузчик в Databricks SQL, используйте функцию read_files . В следующих примерах показано использование автозагрузчика для чтения тома JSON-файлов в потоковую таблицу:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT * FROM STREAM read_files(
    "/Volumes/my_catalog/my_schema/my_volume/path/to/data",
    format => "json"
  );

Для чтения данных из облачного хранилища можно также использовать автозагрузчик:

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    'abfss://[email protected]/analysis/*/*/*.json',
    format => "json"
  );

Дополнительные сведения о автозагрузчике см. в разделе "Что такое автозагрузчик?". Дополнительные сведения об использовании автозагрузчика в SQL см. в статье "Загрузка данных из хранилища объектов".

Прием потоковой передачи из других источников

Пример приема из других источников, включая Kafka, см. в разделе Load data with DLT.

Загружайте только новые данные

По умолчанию функция read_files считывает все существующие данные в исходном каталоге во время создания таблицы, а затем обрабатывает вновь поступающие записи с каждым обновлением.

Чтобы избежать приема данных, которые уже существуют в исходном каталоге во время создания таблицы, задайте параметр includeExistingFiles для false. Это означает, что только данные, поступающие в каталог после создания таблицы, обрабатываются. Рассмотрим пример.

CREATE OR REFRESH STREAMING TABLE sales
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM read_files(
    '/path/to/files',
    includeExistingFiles => false
  );

Настройка канала среды выполнения

Потоковые таблицы, созданные с помощью хранилищ SQL, автоматически обновляются с помощью конвейера DLT. Конвейеры DLT используют среду выполнения в канале current по умолчанию. Ознакомьтесь с заметками о выпуске DLT и процессом обновления выпуска, чтобы узнать больше о процессе выпуска.

Databricks рекомендует использовать канал current для производственных нагрузок. Новые функции сначала выпускаются для канала preview. Конвейер можно задать для канала DLT предварительной версии, чтобы протестировать новые функции, указав preview в качестве свойства таблицы. Это свойство можно указать при создании таблицы или после создания таблицы с помощью инструкции ALTER.

В следующем примере кода показано, как настроить канал для предварительного просмотра в инструкции CREATE:

CREATE OR REFRESH STREAMING TABLE sales
  TBLPROPERTIES ('pipelines.channel' = 'preview')
  SCHEDULE EVERY 1 hour
  AS SELECT *
  FROM STREAM raw_data;

Скрытие конфиденциальных данных

Это важно

Эта функция доступна в общедоступной предварительной версии.

Таблицы стриминга можно использовать для скрытия конфиденциальных данных от пользователей, обращающихся к таблице. Одним из способов является определение запроса, чтобы он полностью исключил конфиденциальные столбцы или строки. Кроме того, можно применить маски столбцов или фильтры строк на основе разрешений пользователя запроса. Например, можно скрыть tax_id столбец для пользователей, которые не находятся в группе HumanResourcesDept. Для этого используйте синтаксис ROW FILTER и MASK во время создания потоковой таблицы. Дополнительные сведения см. в разделе "Фильтрация конфиденциальных данных таблицы" с помощью фильтров строк и маски столбцов.

Обновление потоковой таблицы

Обновления можно запланировать автоматически при создании таблицы потоковой передачи. Кроме того, можно вручную обновить таблицы потоковой передачи. Даже если у вас есть запланированное обновление, вы можете в любое время вызвать обновление вручную. Обновления обрабатываются тем же конвейером, который был автоматически создан вместе с потоковой таблицей.

Чтобы обновить таблицу потоковой передачи, выполните следующее:

REFRESH STREAMING TABLE sales;

Вы можете проверить состояние последнего обновления с помощью DESCRIBE TABLE EXTENDED.

Замечание

Только владелец таблицы может обновить потоковую таблицу, чтобы получить последние данные. Пользователь, создающий таблицу, является владельцем, и владелец не может быть изменен. Вам может потребоваться обновить таблицу потоковой передачи перед использованием запросов перемещения по времени.

Как работает обновление

Обновление потоковой таблицы оценивает только новые строки, поступающие с момента последнего обновления, и добавляет только новые данные.

Каждое обновление использует текущее определение таблицы потоковой передачи для обработки этих новых данных. Изменение описания таблицы потоковой передачи не приводит к автоматическому пересчету существующих данных. Если изменение несовместимо с существующими данными (например, изменение типа данных), следующее обновление завершится ошибкой.

В следующих примерах объясняется, как изменения в определении таблицы потоковой передачи влияют на поведение обновления:

  • Удаление фильтра не будет повторно обрабатывать ранее отфильтрованные строки.
  • Изменение проекций столбцов не влияет на обработку существующих данных.
  • Соединения со статическими моментальными снимками используют состояние моментального снимка во время начальной обработки. Данные, которые поступили позднее и соответствовали бы обновленному моментальному снимку, будут игнорироваться. Это может привести к упущению фактов, если измерения задерживаются.
  • Изменение CAST существующего столбца приведет к ошибке.

Если данные изменяются таким образом, что их нельзя поддерживать в существующей потоковой таблице, вы можете выполнить полное обновление таблицы.

Полное обновление стриминговой таблицы

Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в таких источниках, как Kafka, которые не хранят всю историю данных или имеют короткие периоды хранения, так как полное обновление обрезает существующие данные. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Рассмотрим пример.

REFRESH STREAMING TABLE sales FULL;

Изменение расписания для потоковой таблицы

Вы можете изменить (или задать) расписание автоматического обновления для таблицы потоковой передачи. В следующих примерах показано, как задать расписание с помощью ALTER STREAMING TABLE:

ALTER STREAMING TABLE sales
  ADD SCHEDULE every 1 hour;

Для примера запросов расписания обновления см. ALTER STREAMING TABLE.

Отслеживание состояния обновления

Вы можете увидеть состояние обновления потоковой таблицы, просмотрев поток, который управляет потоковой таблицей в интерфейсе DLT, или просмотрев информацию обновления, возвращаемую командой DESCRIBE EXTENDED для потоковой таблицы.

DESCRIBE TABLE EXTENDED <table-name>;

Кроме того, можно просмотреть таблицу потоковой передачи в обозревателе каталогов и просмотреть состояние обновления:

  1. Щелкните значок каталогаКаталог в боковой панели.
  2. В дереве обозревателя каталогов слева откройте каталог и выберите схему, в которой находится потоковая таблица.
  3. Откройте элемент "Таблицы " в выбранной схеме и щелкните таблицу потоковой передачи.

Здесь можно использовать вкладки под именем потоковой таблицы для просмотра и редактирования сведений о таблице потоковой передачи, в том числе:

  • Обновление состояния и истории
  • Схема таблицы
  • Примеры данных (требуется активный вычислительный ресурс)
  • Разрешения
  • Происхождение, включая таблицы и конвейеры, от которых зависит эта потоковая таблица
  • Аналитические сведения об использовании
  • Мониторы, которые вы создали для этой потоковой таблицы

Управление доступом к потоковым таблицам

Потоковые таблицы поддерживают расширенные элементы управления доступом для совместного использования данных, не подвергая частные данные риску. Владелец таблицы потоковой передачи или пользователь с MANAGE привилегией может предоставить SELECT права другим пользователям. Пользователи, имеющие SELECT доступ к потоковой таблице, не нуждаются в SELECT доступе к таблицам, на которые ссылается потоковая таблица. Этот контроль доступа обеспечивает общий доступ к данным при управлении доступом к базовым данным.

Предоставьте привилегии потоковой таблице

Чтобы предоставить доступ к таблице потоковой передачи, используйте инструкциюGRANT:

GRANT <privilege_type> ON <st_name> TO <principal>;

privilege_type может быть:

  • SELECT — пользователь может управлять SELECT потоковой таблицей.
  • REFRESH — пользователь может управлять REFRESH потоковой таблицей. Обновления выполняются с помощью разрешений владельца.

В следующем примере создается потоковая таблица и предоставляются права выбора и обновления пользователям:

CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

Дополнительные сведения о предоставлении привилегий для защищаемых объектов каталога Unity см. в разделе "Права каталога Unity" и защищаемые объекты.

Отозвать привилегии из потоковой таблицы

Чтобы отозвать доступ к потоковой таблице, используйте инструкциюREVOKE:

REVOKE privilege_type ON <st_name> FROM principal;

Если SELECT права на исходную таблицу отзываются у владельца потоковой таблицы или любого другого пользователя, которому предоставлены привилегии MANAGE или SELECT на потоковую таблицу, или если исходная таблица удаляется, владелец потоковой таблицы или пользователь, имеющий доступ, всё равно может выполнять запросы к потоковой таблице. Однако происходит следующее поведение:

  • Владелец потоковой таблицы или другие пользователи, которые потеряли к ней доступ, больше не смогут использовать потоковую таблицу, и она станет устаревшей.
  • Если автоматизировано с расписанием, следующий запланированный REFRESH либо не выполняется, либо завершится сбоем.

В следующем примере привилегия SELECT отзывается у read_only_user.

REVOKE SELECT ON st_name FROM read_only_user;

окончательное удаление записей из потоковой таблицы

Это важно

Поддержка инструкции REORG с потоковыми таблицами доступна в общедоступной предварительной версии.

Замечание

  • Использование инструкции REORG с потоковой таблицей требует Databricks Runtime 15.4 и более поздних версий.
  • Хотя инструкцию REORG можно использовать с любой таблицей потоковой передачи, она требуется только при удалении записей из таблицы потоковой передачи с включенными векторами удаления . Команда не действует при использовании со стриминговой таблицей, если векторы удаления не включены.

Чтобы физически удалить записи из базового хранилища потоковой таблицы с включенными векторами удаления, например в целях соблюдения требований GDPR, необходимо принять дополнительные меры, чтобы обеспечить выполнение операции VACUUM на данных потоковой таблицы.

Чтобы физически удалить записи из базового хранилища, выполните приведенные ниже действия.

  1. Обновите записи или удалите записи из таблицы потоковой передачи.
  2. Выполните выражение REORG над потоковой таблицей, указав параметр APPLY (PURGE). Например, REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Дождитесь окончания срока хранения данных в потоковой таблице. Срок хранения данных по умолчанию составляет семь дней, но его можно настроить с помощью свойства таблицы delta.deletedFileRetentionDuration. См. Настройка сохранения данных для запросов по временному перемещению.
  4. REFRESH потоковая таблица. См . статью "Обновить потоковую таблицу". В течение 24 часов после операции REFRESH задачи обслуживания DLT, включая операцию VACUUM, необходимую для окончательного удаления записей, выполняются автоматически.

Мониторинг запусков с помощью журнала запросов

Вы можете использовать страницу журнала запросов для доступа к сведениям о запросах и профилям запросов, которые помогут определить плохое выполнение запросов и узких мест в конвейере DLT, используемом для запуска обновлений потоковой таблицы. Общие сведения о типе информации, доступной в журналах запросов и профилях запросов, см. в разделе "Журнал запросов" и "Профиль запросов".

Это важно

Эта функция доступна в общедоступной предварительной версии. Администраторы рабочей области могут включить эту функцию на странице предварительной версии. См. статью "Управление предварительными версиями Azure Databricks".

Все записи, связанные со стриминговыми таблицами, отображаются в журнале запросов. Вы можете использовать раскрывающийся список фильтра Statement, чтобы выбрать любую команду и проверить связанные запросы. За всеми операторами CREATE следует оператор REFRESH, который выполняется асинхронно в конвейере DLT. Инструкции REFRESH обычно включают подробные планы запросов, которые предоставляют аналитические сведения о оптимизации производительности.

Чтобы получить доступ к REFRESH запросам в журнале запросов, выполните следующие действия.

  1. Щелкните значок истории в левой боковой панели, чтобы открыть пользовательский интерфейс истории запросов.
  2. Выберите флажок REFRESH, используя фильтр раскрывающегося списка заявления.
  3. Щелкните имя инструкции запроса, чтобы просмотреть сводные сведения, такие как длительность запроса и агрегированные метрики.
  4. Щелкните Просмотреть профиль запроса, чтобы открыть профиль запроса. Для получения информации о навигации в профиле запроса см. Профиль запроса.
  5. При необходимости можно использовать ссылки в разделе "Источник запросов", чтобы открыть связанный запрос или конвейер.

Вы также можете получить доступ к сведениям о запросах с помощью ссылок в редакторе SQL или из записной книжки, подключенной к хранилищу SQL.

Дополнительные ресурсы