Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Обработка данных осуществляется в декларативных конвейерах Lakeflow с использованием потоков. Каждый поток состоит из запроса и, как правило, целевого объекта. Поток обрабатывает запрос как пакет, так и добавочно в виде потока данных в целевой объект. Поток живет в конвейере ETL в Azure Databricks.
Как правило, потоки определяются автоматически при создании запроса в декларативных конвейерах Lakeflow, обновляющих целевой объект, но также можно явно определить дополнительные потоки для более сложной обработки, например добавление к одному целевому объекту из нескольких источников.
Обновления
Поток выполняется каждый раз, когда обновляется его определяющий конвейер. Поток создаст или обновит таблицы с последними доступными данными. В зависимости от типа потока и состояния изменений данных обновление может выполнять добавочное обновление, которое обрабатывает только новые записи или выполняет полное обновление, которое повторно обрабатывает все записи из источника данных.
- Дополнительные сведения об обновлениях конвейера см. в разделе "Запуск обновления в декларативных конвейерах Lakeflow".
- Дополнительные сведения о планировании и активации обновлений см. в разделе "Активация и непрерывный режим конвейера".
Создание потока по умолчанию
При создании объекта Lakeflow Declarative Pipelines в конвейере, как правило, определяется таблица или представление вместе с запросом, поддерживающим его. Например, в этом SQL-запросе создается потоковая таблица, вызываемая customers_silver
с помощью чтения из этой customers_bronze
таблицы.
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Вы также можете создать ту же таблицу потоковой передачи в Python. В Python обычно используются декларативные конвейеры Lakeflow, создавая функцию запроса, возвращающую кадр данных, с декораторами для доступа к функциям Декларативных конвейеров Lakeflow:
import dlt
@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
В этом примере вы создали таблицу потоковой передачи. Вы также можете создавать материализованные представления с аналогичным синтаксисом как в SQL, так и в Python. Дополнительные сведения см. в потоковых таблицах и материализованных представлениях.
В этом примере создается поток по умолчанию вместе с таблицей потоков. Поток потоковой передачи по умолчанию — это поток добавления , который добавляет новые строки с каждым триггером. Это наиболее распространенный способ использования декларативных конвейеров Lakeflow для создания потока и цели за один шаг. Этот стиль можно использовать для приема данных или преобразования данных.
Потоки добавления также поддерживают обработку, требующую чтения данных из нескольких источников потоковой передачи для обновления одного целевого объекта. Например, вы можете использовать функциональность добавления к потоку, если у вас есть существующий поток и потоковая таблица и требуется добавить новый источник потоковой передачи, который записывается в эту существующую потоковую таблицу.
Использование нескольких потоков для записи в один целевой объект
В предыдущем примере вы создали поток и потоковую таблицу в один шаг. Вы также можете создавать потоки для ранее созданной таблицы. В этом примере можно увидеть создание таблицы и потока, связанного с ним, в отдельных шагах. Этот код имеет идентичные результаты, как создание потока по умолчанию, включая использование того же имени для потоковой таблицы и потока.
Питон
import dlt
# create streaming table
dlt.create_streaming_table("customers_silver")
# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Создание потока независимо от целевого объекта означает, что можно также создать несколько потоков, которые добавляют данные к одному целевому объекту.
Используйте декоратор @append_flow
в интерфейсе Python или предложение CREATE FLOW...INSERT INTO
в интерфейсе SQL, чтобы создать новый поток, например, чтобы направить в потоковую таблицу из нескольких потоковых источников. Используйте поток добавления для обработки задач, таких как следующие:
- Добавьте источники потоковой передачи, добавляющие данные в существующую потоковую таблицу без полного обновления. Например, у вас может быть таблица, объединяющая региональные данные из каждого региона, в который вы работаете. При развертывании новых регионов можно добавить новые данные региона в таблицу без полного обновления. Пример добавления источников потоковой передачи в существующую таблицу потоковой передачи см. в примере: запись в таблицу потоковой передачи из нескольких разделов Kafka.
- Обновите потоковую таблицу данных, добавив отсутствующие исторические данные (дозаполнение). Например, у вас есть существующая стриминговая таблица, в которую записывается тема Apache Kafka. У вас также есть исторические данные, хранящиеся в таблице, которые нужно вставить ровно один раз в потоковую таблицу, и вы не можете выполнять потоковую обработку данных, так как ваша обработка включает выполнение сложной агрегации перед вставкой данных. Пример обратного заполнения данных см. в разделе: выполнение однократного обратного заполнения данных.
- Объедините данные из нескольких источников и запишите в одну потоковую таблицу вместо использования выражения
UNION
в запросе. Использование потоковой обработки добавлений вместоUNION
позволяет обновлять целевую таблицу поэтапно без выполнения полного обновления. Пример объединения, выполненный таким образом, см. в примере : используйте обработку потока добавления вместоUNION
.
Целевым объектом для записи выходных данных при обработке потока добавления может быть существующая таблица или новая таблица. Для запросов Python используйте функцию create_streaming_table() для создания целевой таблицы.
В следующем примере добавляются два потока для одного целевого объекта, создав объединение двух исходных таблиц:
Питон
import dlt
# create a streaming table
dlt.create_streaming_table("customers_us")
# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
Важный
- Если вам нужно задать ограничения качества данных с помощью ожиданий , определите ожидания в целевой таблице как часть функции
create_streaming_table()
или в определении существующей таблицы. Невозможно определить ожидания в определении@append_flow
. - Потоки определяются именем потока, и это имя используется для идентификации контрольных точек потоковой передачи. Использование имени потока для идентификации контрольной точки означает следующее:
- Если существующий поток в конвейере переименован, контрольная точка не переносится, и переименованный поток фактически является совершенно новым потоком.
- Невозможно повторно использовать имя потока в конвейере, так как существующая контрольная точка не будет соответствовать новому определению потока.
Типы потоков
Потоки по умолчанию для потоковых таблиц и материализованных представлений — это потоки добавления. Вы также можете создавать потоки для чтения из источников данных отслеживания измененных данных . В следующей таблице описаны различные типы потоков.
Тип потока | Описание |
---|---|
Добавить |
Потоки добавления — это наиболее распространенный тип потока, где новые записи в источнике записываются в целевой объект с каждым обновлением. Они соответствуют режиму добавления в структурированной потоковой передаче. Вы можете добавить ONCE флаг, указывающий пакетный запрос, данные которого должны вставляться в целевой объект только один раз, если целевой объект не будет полностью обновлен. Любое количество потоков добавления может записываться в определенный целевой объект.Потоки по умолчанию (созданные с целевой потоковой таблицей или материализованным представлением) будут иметь то же имя, что и целевой объект. Другие целевые объекты не имеют потоков по умолчанию. |
Auto CDC (ранее применить изменения) | Поток Auto CDC отправляет запрос, содержащий данные записи измененных данных (CDC). Автоматические потоки CDC могут использовать только потоковые таблицы как целевые объекты, а источник должен быть потоковым (даже в случае потоков ONCE ). Несколько автоматических потоков CDC могут быть нацелены на одну потоковую таблицу. Потоковая таблица, которая выступает в качестве целевого объекта для потока авто CDC, может быть нацелена только на другие потоки авто CDC.Для получения дополнительной информации о данных CDC см. API AUTO CDC: упрощение захвата изменений с помощью декларативных конвейеров Lakeflow. |
Дополнительные сведения
Дополнительные сведения о потоках и их использовании см. в следующих разделах:
- Обзор декларативных конвейеров Lakeflow
- Примеры потоков в декларативных конвейерах Lakeflow
- Написание декларативных конвейеров Lakeflow в Python или SQL
- Потоковые таблицы
- материализованные представления
- Используйте синки для потоковой передачи записей во внешние службы с декларативными потоками Lakeflow