Поделиться через


Управление качеством данных с помощью ожиданий в рамках конвейера

Используйте ожидания для применения ограничений качества, которые проверяют данные по мере потоков через конвейеры ETL. Ожидания обеспечивают более подробные сведения о метриках качества данных и позволяют завершать обновления или удалять записи при обнаружении недопустимых записей.

Эта статья содержит обзор ожиданий, включая примеры синтаксиса и варианты поведения. Для более сложных сценариев использования и рекомендуемых лучших практик, см. Рекомендации по ожиданиям и продвинутые шаблоны.

График ожиданий в декларативных конвейерах Lakeflow

Что такое ожидания?

Ожидания — это необязательные условия в инструкциях по созданию конвейерного материализованного представления, потоковой таблицы или представления, которые применяют проверки качества данных к каждой записи, проходящей через запрос. Ожидания используют стандартные логические инструкции SQL для указания ограничений. Вы можете объединить несколько ожиданий для одного набора данных и установить ожидания для всех объявлений наборов данных в конвейере.

В следующих разделах представлены три компонента ожидания и приведены примеры синтаксиса.

Название ожидания

Каждое ожидание должно иметь имя, которое используется в качестве идентификатора для мониторинга и отслеживания ожиданий. Выберите имя, которое передает проверяемые метрики. В следующем примере определяется ожидание, valid_customer_age для подтверждения того, что возраст составляет от 0 до 120 лет:

Важно

Имя ожидания должно быть уникальным для заданного набора данных. Ожидания можно использовать повторно в нескольких наборах данных в потоке данных. См. переносимые и повторно используемые условия.

Питон

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

Ограничение для оценки

Предложение ограничения — это условный оператор SQL, который должен иметь значение true или false для каждой записи. Ограничение содержит фактическую логику для проверяемого объекта. Если запись не соответствует этому условию, то срабатывает ожидание.

Ограничения должны использовать допустимый синтаксис SQL и не могут содержать следующее:

  • Пользовательские функции Python
  • Вызовы внешних служб
  • Вложенные запросы, ссылающиеся на другие таблицы

Ниже приведены примеры ограничений, которые можно добавить в инструкции создания набора данных:

Питон

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

Действие по недопустимой записи

Необходимо указать действие, чтобы определить, что происходит при сбое проверки записи. В следующей таблице описываются доступные действия.

Действие Синтаксис SQL Синтаксис Python Результат
предупреждение (по умолчанию) EXPECT dlt.expect Неверные записи записываются в цель.
падение EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop Недопустимые записи удаляются перед записью данных в целевой объект. Количество удаленных записей регистрируется вместе с другими метриками набора данных.
провал EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail Неверные записи мешают успешному выполнению обновления. Перед повторной обработкой требуется вмешательство вручную. Это ожидание приводит к сбою одного потока и не приводит к сбою других потоков в конвейере.

Вы также можете реализовать расширенную логику для карантина недопустимых записей без сбоя или удаления данных. См. Карантин недопустимых записей.

Метрики для отслеживания ожиданий

Метрики отслеживания для действий warn или drop можно просмотреть в пользовательском интерфейсе конвейера. Так как fail приводит к сбою обновления при обнаружении недопустимой записи, метрики не записываются.

Чтобы просмотреть метрики ожидания, выполните следующие действия.

  1. Щелкните "Конвейеры" на боковой панели.
  2. Щелкните название вашего конвейера.
  3. Нажмите на набор данных, для которого определено ожидание.
  4. Выберите вкладку качества данных на правой боковой панели.

Вы можете просмотреть метрики качества данных, отправив запрос к журналу событий Lakeflow Declarative Pipelines. См. Проверьте качество данных запроса из журнала событий.

Сохранить недопустимые записи

Сохранение недействительных записей является стандартным поведением для ожидаемых условий. Используйте оператор expect, если вы хотите сохранить записи, которые нарушают ожидание, но собирать метрики о том, сколько записей соответствуют или не соответствуют ограничению. Записи, которые нарушают ожидание, добавляются в целевой набор данных вместе с допустимыми записями:

Питон

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Удалить недопустимые записи

Используйте оператор expect_or_drop, чтобы предотвратить дальнейшую обработку недопустимых записей. Записи, которые нарушают ожидание, удаляются из целевого набора данных:

Питон

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Сбой при наличии недопустимых записей

Если недопустимые записи неприемлемы, используйте оператор expect_or_fail, чтобы немедленно остановить выполнение при сбое проверки записи. Если операция является обновлением таблицы, система атомарно откатывает транзакцию:

Питон

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Важно

При наличии нескольких параллельных потоков, определенных в конвейере, сбой одного потока не приводит к сбою других потоков.

График объяснения отказов потока в декларативных конвейерах Lakeflow

Устранение неполадок с обновлениями, которые не оправдали ожиданий.

Если конвейер завершается сбоем из-за нарушения ожидания, необходимо исправить код конвейера, чтобы правильно обрабатывать недопустимые данные перед повторной запуском конвейера.

Ожидания, настроенные для сбоя конвейеров, изменяют план запроса Spark ваших преобразований, чтобы отслеживать информацию, необходимую для обнаружения и сообщения о нарушениях. Эти сведения можно использовать для определения входной записи, которая привела к нарушению для многих запросов. Декларативные конвейеры Lakeflow предоставляют выделенное сообщение об ошибке для сообщения о таких нарушениях. Вот пример сообщения об ошибке нарушения ожиданий.

[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

управление несколькими ожиданиями

Заметка

Хотя и SQL, и Python поддерживают несколько ожиданий в одном наборе данных, только Python позволяет группировать несколько ожиданий и задавать коллективные действия.

Декларативные конвейеры Lakeflow с несколькими ожиданиями в графе fLow

Можно объединить несколько ожиданий и указать коллективные действия с помощью функций expect_all, expect_all_or_dropи expect_all_or_fail.

Эти декораторы принимают словарь Python в качестве аргумента, где ключ — это имя ожидания, а значение — ограничение ожидания. Вы можете повторно использовать один набор ожиданий в нескольких наборах данных в конвейере. Ниже приведены примеры каждого из операторов Python expect_all:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset

Ограничения

  • Поскольку только потоковые таблицы и материализованные представления поддерживают ожидания, метрики качества данных поддерживаются только для этих типов объектов.
  • Метрики качества данных недоступны, когда:
    • Для запроса не определены ожидания.
    • Поток использует оператор, который не поддерживает ожидания.
    • Тип потока, например приемники Декларативных конвейеров Lakeflow, не поддерживает ожидания.
    • Для данного запуска потока нет обновлений в связанных потоковой таблице или материализованном представлении.
    • Конфигурация конвейера не включает необходимые настройки для сбора метрик, таких как pipelines.metrics.flowTimeReporter.enabled.
  • В некоторых случаях поток COMPLETED может не содержать метрики. Вместо этого метрики сообщаются в каждом микро-пакете в событии с состоянием flow_progressRUNNING.