Поделиться через


Таблицы потоков

Потоковая таблица — это таблица Delta с дополнительной поддержкой потоковой или добавочной обработки данных. Потоковая таблица может быть нацелена на один или несколько потоков в конвейере ETL.

Потоковые таблицы являются хорошим выбором для загрузки данных по следующим причинам:

  • Каждая входная строка обрабатывается только один раз, что моделирует подавляющее большинство рабочих нагрузок по приему данных, а именно добавление или обновление строк в таблице.
  • Они могут обрабатывать большие объемы данных, доступных только для добавления.

Потоковые таблицы также являются хорошим выбором для потоковых преобразований с низкой задержкой по следующим основаниям:

  • Анализ по строкам и временным окнам
  • Обработка больших объемов данных
  • Низкая задержка

На следующей схеме показано, как работают потоковые таблицы.

Схема, показывающая, как работают потоковые таблицы

В каждом обновлении потоки, связанные с потоковой таблицей, считывают измененные сведения в источнике потоковой передачи и добавляют новые сведения в эту таблицу.

Потоковые таблицы определяются и обновляются одним конвейером. Вы явно определяете таблицы потоковой передачи в исходном коде конвейера. Таблицы, определенные конвейером, не могут быть изменены или обновлены любым другим конвейером. Можно определить несколько потоков для добавления к одной таблице потоковой передачи.

При создании таблицы потоковой передачи за пределами конвейера в Databricks SQL, Databricks создаёт скрытый конвейер, используемый для обновления этой таблицы.

Для получения дополнительной информации о потоках см. раздел "Потоковая загрузка и обработка данных с использованием декларативных конвейеров Lakeflow".

Таблицы потоковой обработки для загрузки данных

Потоковые таблицы предназначены для источников данных, к которым можно только добавлять, и обрабатывают входные данные единожды.

В следующем примере показано, как использовать потоковую таблицу для приема новых файлов из облачного хранилища.

Питон

import dlt

# create a streaming table
@dlt.table
def customers_bronze():
  return (
    spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .option("cloudFiles.inferColumnTypes", "true")
     .load("/Volumes/path/to/files")
  )

При использовании spark.readStream функции в определении набора данных она приводит к тому, что Декларативные конвейеры Lakeflow обрабатывают набор данных как поток, а созданная таблица представляет собой потоковую таблицу.

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
  "/volumes/path/to/files",
  format => "json"
);

Дополнительные сведения о загрузке данных в потоковую таблицу см. в разделе "Загрузка данных с помощью декларативных конвейеров Lakeflow".

Следующая диаграмма иллюстрирует, как работают потоковые таблицы в режиме только добавления.

Схема, показывающая, как работают потоковые таблицы только для добавления

Строка, которая уже добавлена в потоковую таблицу, не будет повторно запрашиваться с последующими обновлениями конвейера. Если вы модифицируете запрос (например, из SELECT LOWER (name) в SELECT UPPER (name)), существующие строки не изменятся в верхний регистр, но новые строки будут в верхнем регистре. Вы можете запустить полное обновление для повторного запроса всех ранее полученных данных из исходной таблицы, чтобы обновить все строки в таблице потоковой передачи.

Потоковые таблицы и стриминг с низкой задержкой

Таблицы потоковой обработки предназначены для потоковой обработки с низкой задержкой при ограниченном состоянии. Потоковые таблицы используют управление контрольными точками, что делает их хорошо подходящими для обработки данных с низкой задержкой в потоковом режиме. Тем не менее, они ожидают потоки, которые естественным образом ограничены или ограничены водяным знаком.

Естественным образом ограниченный поток создается источником потоковых данных, который имеет четко определенное начало и конец. Пример естественно привязанного потока — чтение данных из каталога файлов, в которых новые файлы не добавляются после размещения первоначального пакета файлов. Поток считается привязанным, так как число файлов является конечным, а затем поток заканчивается после обработки всех файлов.

Вы также можете использовать водяной знак для ограничения потока. Водяной знак в Структурированной потоковой передаче Spark — это механизм, который помогает обрабатывать поздние данные, указывая, сколько времени система должна ожидать отложенных событий, прежде чем считать окно времени завершенным. Неограниченный поток, который не имеет метки времени, может привести к сбою конвейера из-за давления на память.

Для получения дополнительной информации об обработке потоков с отслеживанием состояния см. Оптимизация обработки состояния в декларативных конвейерах Lakeflow с водяными знаками.

Соединения потоков и снимков

Соединения типа "stream-snapshot" — это соединения между потоком и измерением, зафиксированным в момент запуска потоков. Эти соединения не перекомпьютерируются, если измерение изменится после запуска потока, так как таблица измерений обрабатывается как моментальный снимок во времени, и изменения в таблице измерений после запуска потока не отражаются, если вы не перезагрузите или обновите таблицу измерений. Это разумное поведение, если вы можете принять небольшие несоответствия в соединении. Например, приблизительное объединение допустимо, если количество транзакций в несколько порядков величины превышает число клиентов.

В следующем примере кода мы объединяем таблицу измерений под названием «клиенты», содержащую две строки, с постоянно увеличивающимся набором данных «транзакции». Мы материализуем соединение между этими двумя наборами данных в таблице под названием sales_report. Обратите внимание, что если внешний процесс обновляет таблицу клиентов добавлением новой строки (customer_id=3, name=Zoya), эта новая строка не будет присутствовать в объединении, так как статическая таблица измерений была зафиксирована при запуске потоков.

import dlt

@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
  return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
  return spark.read.table("customers")

@dlt.table
def sales_report():
  facts = spark.readStream.table("v_transactions")
  dims = spark.read.table("v_customers")

  return (
    facts.join(dims, on="customer_id", how="inner"
  )

Ограничения потоковой таблицы

Таблицы потоковой передачи имеют следующие ограничения:

  • Ограниченная эволюция. Запрос можно изменить без повторной компиляции всего набора данных. Так как стриминговая таблица видит только одну строку один раз, вы можете использовать разные запросы для работы с разными строками. Это означает, что необходимо знать обо всех предыдущих версиях запроса, выполняющихся в наборе данных. Полное обновление требуется для того, чтобы данные обновления потоковой таблицы, которые уже были обработаны.
  • Управление состоянием: Потоковые таблицы имеют низкую задержку, поэтому необходимо убедиться, что потоки, с которыми они работают, естественно ограничены или с ограничением по водяному знаку. Дополнительные сведения см. в разделе "Оптимизация обработки с отслеживанием состояния" в декларативных конвейерах Lakeflow с подложками.
  • Соединения не перекомпьютерируются: Соединения в таблицах потоковой передачи не перекомпьютерируются при изменении измерений. Эта характеристика может быть хорошей для сценариев "быстро, но неправильно". Если вы хотите, чтобы представление всегда было правильным, может потребоваться использовать материализованное представление. Материализованные представления всегда правильны, так как они автоматически перекомпьютерируют соединения при изменении измерений. Для получения дополнительной информации см. материализованные представления.