Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Декларативные конвейеры Lakeflow содержат несколько новых ключевых слов и функций SQL для определения материализованных представлений и потоковых таблиц в конвейерах. Поддержка SQL для разработки конвейеров основана на основах Spark SQL и добавляет поддержку функций структурированной потоковой передачи.
Пользователи, знакомые с PySpark DataFrames, могут предпочесть разработку кода конвейера с помощью Python. Python поддерживает более обширное тестирование и операции, которые сложно реализовать с помощью SQL, такие как операции метапрограммирования. См. раздел Разработка кода конвейера с помощью Python.
Полный справочник по синтаксису SQL декларативного конвейера Lakeflow см. в справочнике по языку SQL для декларативного конвейера Lakeflow.
Основы SQL для разработки конвейеров
Код SQL, создающий наборы данных Lakeflow Declarative Pipelines, использует CREATE OR REFRESH
синтаксис для определения материализованных представлений и стриминговых таблиц на основе результатов запроса.
Ключевое слово STREAM
указывает, должен ли источник данных, на который ссылается предложение SELECT
, считываться с семантикой потоковой передачи.
Считывает и записывает данные по умолчанию в каталог и схему, указанную во время настройки конвейера. См. Установка целевого каталога и схемы.
Исходный код декларативного конвейера Lakeflow критически отличается от скриптов SQL: Декларативные конвейеры Lakeflow оценивают все определения набора данных во всех файлах исходного кода, настроенных в конвейере, и создает граф потока данных перед выполнением любых запросов. Порядок запросов, отображаемых в записной книжке или скрипте, определяет порядок вычисления кода, но не порядок выполнения запроса.
Создание материализованного представления с помощью SQL
В следующем примере кода показан базовый синтаксис для создания материализованного представления с помощью SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Создание таблицы потоковой передачи с помощью SQL
В следующем примере кода показан базовый синтаксис для создания таблицы потоковой передачи с помощью SQL. При чтении источника для потоковой таблицы ключевое слово STREAM
указывает на использование потоковой семантики для этого источника. Не используйте ключевое STREAM
слово при создании материализованного представления:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Заметка
Используйте ключевое слово STREAM для использования семантики потоковой передачи для чтения из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления. Для приема данных с коммитами изменений можно использовать Python и SkipChangeCommits
опцию для обработки ошибок.
Загрузка данных из хранилища объектов
Декларативные конвейеры Lakeflow поддерживают загрузку данных из всех форматов, поддерживаемых Azure Databricks. См. параметры формата данных .
Заметка
В этих примерах используются данные, доступные на /databricks-datasets
, автоматически подключённые к вашей рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. статью Что такое тома каталога Unity?.
Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. Что такое автозагрузчик?.
SQL использует функцию read_files
для вызова функций автозагрузчика. Необходимо также использовать ключевое слово STREAM
для настройки потокового чтения с помощью read_files
.
Ниже описан синтаксис для read_files
в SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Параметры Автозагрузчика — это пары "ключ-значение". Дополнительные сведения о поддерживаемых форматах и параметрах см. в разделе Параметры.
В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Функция read_files
также поддерживает пакетную семантику для создания материализованных представлений. В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Проверка данных с ожиданиями
Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных с ожиданиями конвейера.
Следующий код определяет ожидание с именем valid_data
, которое удаляет записи, которые являются null во время приема данных:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Выполните запрос к материализованным представлениям и потоковым таблицам, которые определены в вашем конвейере
В следующем примере определяются четыре набора данных:
- Потоковая таблица с именем
orders
, которая загружает данные JSON. - Материализованное представление с именем
customers
, которое загружает данные CSV. - Материализованное представление с именем
customer_orders
, которое объединяет записи из наборов данныхorders
иcustomers
, преобразует метку времени заказа в дату и выбирает поляcustomer_id
,order_number
,state
иorder_date
. - Материализованное представление под именем
daily_orders_by_state
, которое агрегирует ежедневное количество заказов для каждого штата.
Заметка
При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблицы orders
, customers
и customer_orders
записываются и считываются из каталога по умолчанию и схемы, настроенной для конвейера.
Устаревший режим публикации использует схему LIVE
для выполнения запросов к другим материализованным представлениям и потоковым таблицам, определенным в конвейере обработки данных. В новых конвейерах синтаксис схемы LIVE
автоматически игнорируется. См. схему LIVE (устаревшую версию).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Определение частной таблицы
Вы можете использовать PRIVATE
при создании материализованного представления или потоковой таблицы. При создании частной таблицы вы создаете таблицу, но не создаете метаданные для таблицы. Условие PRIVATE
предписывает Декларативным конвейерам Lakeflow создать таблицу, которая будет доступна для конвейера, но не должна быть доступна за его пределами. Чтобы сократить время обработки, частная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.
Частные таблицы могут иметь то же имя, что и таблицы в каталоге. Если вы укажете неквалифицированное имя таблицы внутри конвейера, и если есть как частная таблица, так и таблица каталога с таким именем, будет использоваться частная таблица.
Частные таблицы ранее назывались временными таблицами.
Окончательное удаление записей из материализованного представления или потоковой таблицы
Чтобы окончательно удалить записи из таблицы потоковой передачи с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы гарантировать удаление записей из потоковой таблицы, см. постоянное удаление записей из потоковой таблицы.
Материализованные представления всегда отражают данные в базовых таблицах при их обновлении. Чтобы удалить данные в материализованном представлении, необходимо удалить данные из источника и обновить материализованное представление.
Параметризация значений, используемых при объявлении таблиц или представлений с помощью SQL
Используйте SET
для указания значения конфигурации в запросе, объявляющего таблицу или представление, включая конфигурации Spark. Любая таблица или представление, определенное в записной книжке после инструкции SET
, имеет доступ к заданному значению. Все конфигурации Spark, указанные с помощью инструкции SET
, используются при выполнении запроса Spark для любой таблицы или представления после инструкции SET. Чтобы считывать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}
. Следующий пример задает значение конфигурации Spark с именем startDate
и использует это значение в запросе:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Чтобы указать несколько значений конфигурации, используйте отдельную инструкцию SET
для каждого значения.
Ограничения
Условие PIVOT
не поддерживается. Операция pivot
в Spark требует активной загрузки входных данных для вычисления выходной схемы. Эта возможность не поддерживается в декларативных конвейерах Lakeflow.
Заметка
Синтаксис CREATE OR REFRESH LIVE TABLE
для создания материализованного представления не рекомендуется. Вместо этого используйте CREATE OR REFRESH MATERIALIZED VIEW
.