Поделиться через


Руководство: Создание ETL-конвейера с фиксацией изменений данных с помощью декларативных конвейеров Lakeflow

Узнайте, как создавать и развертывать конвейер ETL (извлечение, преобразование и загрузка) с использованием записи измененных данных (CDC), декларативных конвейеров Lakeflow для оркестрации данных и Auto Loader. Конвейер ETL реализует шаги для чтения данных из исходных систем, преобразования этих данных на основе требований, таких как проверки качества данных и отмена дублирования данных, а также запись данных в целевую систему, например хранилище данных или озеро данных.

В этом руководстве вы будете использовать данные из customers таблицы в базе данных MySQL для:

  • Извлеките изменения из транзакционной базы данных с помощью Debezium или любого другого средства и сохраните их в облачном хранилище объектов (папка S3, ADLS, GCS). Вы пропустите этап настройки внешней системы CDC, чтобы упростить прохождение учебного материала.
  • Используйте автозагрузчик для добавочной загрузки сообщений из облачного хранилища объектов и хранения необработанных сообщений в customers_cdc таблице. Автозагрузчик выводит схему и обрабатывает эволюцию схемы.
  • Добавьте представление customers_cdc_clean для проверки качества данных с помощью ожиданий. Например, id никогда не должен быть null, так как вы будете использовать его для выполнения операций upsert.
  • AUTO CDC ... INTO Выполните операцию вставки или обновления на очищенных данных CDC, чтобы применить изменения к окончательной customers таблице.
  • Покажите, как декларативные конвейеры Lakeflow могут создать медленно меняющееся измерение типа 2 (SCD2), чтобы отслеживать все изменения.

Цель состоит в том, чтобы принять необработанные данные практически в реальном времени и создать таблицу для вашей группы аналитиков, обеспечивая качество данных.

В этом руководстве используется архитектура medallion Lakehouse, в которой происходят получение необработанных данных через бронзовый слой, очистка и проверка данных с серебряным слоем, а также применение измерительного моделирования и агрегирование с помощью золотого слоя. Дополнительные сведения см. в статье об архитектуре medallion lakehouse.

Поток, который будет реализован, выглядит следующим образом:

LDP с CDC

Дополнительные сведения о декларативных конвейерах Lakeflow, автозагрузчике и CDC см. в разделе "Декларативные конвейеры Lakeflow", "Что такое автозагрузчик?", а также что такое запись измененных данных (CDC)?

Requirements

Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:

Изменение записи данных в конвейере ETL

Запись измененных данных (CDC) — это процесс, который записывает изменения записей, сделанных в транзакционной базе данных (например, MySQL или PostgreSQL) или хранилище данных. CDC записывает такие операции, как удаление данных, добавление и обновление, как правило, в виде потока для повторной материализации таблицы во внешних системах. CDC обеспечивает добавочную загрузку, устраняя необходимость массового обновления нагрузки.

Note

Чтобы упростить руководство, пропустите настройку внешней системы CDC. Вы можете считать его готовым к работе и сохранять данные CDC в JSON-файлах в облачном хранилище (S3, ADLS, GCS).

Capturing CDC

Доступны различные средства CDC. Одним из лидеров среди решений с открытым исходным кодом является Debezium, но существуют и другие реализации, упрощающие использование источника данных, такие как Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate и AWS DMS.

В этом руководстве вы используете данные CDC из внешней системы, например Debezium или DMS. Debezium фиксирует каждую измененную строку. Обычно он отправляет журнал изменений данных в журналы Kafka или сохраняет их в виде файла.

Необходимо получить данные CDC из customers таблицы (формат JSON), убедиться, что они правильные, а затем материализовать таблицу клиента в Lakehouse.

Входные данные CDC из Debezium

Для каждого изменения вы получите сообщение JSON, содержащее все поля обновляемой строки (id, , firstname, lastnameemail). address Кроме того, у вас будут дополнительные сведения о метаданных, в том числе:

  • operation: код операции, обычно (DELETE, APPEND, UPDATE).
  • operation_date: метка даты и времени записи для каждого операционного действия.

Такие инструменты, как Debezium, могут создавать более сложные выходные данные, например, показывать значение строки до изменения, но в этом руководстве они опущены для упрощения.

Шаг 0. Настройка данных руководства

Сначала необходимо создать записную книжку и установить демонстрационные файлы, используемые в этом руководстве, в рабочую область.

  1. Щелкните "Создать" в левом верхнем углу.

  2. Click Notebook.

  3. Измените заголовок записной книжки с Неозаглавленной записной книжки <дата и время> на инструкции по настройке конвейеров.

  4. Рядом с заголовком записной книжки в верхней части задайте язык записной книжки по умолчанию для Python.

  5. Откройте панель среды записной книжки, щелкнув значок среды. (Среда) в правой боковой панели редактора записной книжки.

  6. Эта записная книжка использует faker для создания фиктивных данных в качестве примера. Добавьте faker в качестве зависимости записной книжки и нажмите кнопку "Применить " в нижней части панели среды.

  7. Чтобы создать набор данных, используемый в руководстве, введите следующий код в первой ячейке и введите SHIFT + ВВОД , чтобы запустить код:

    # You can change the catalog, schema, dbName, and db. If you do so, you must also
    # change the names in the rest of the tutorial.
    catalog = "main"
    schema = dbName = db = "dbdemos_dlt_cdc"
    volume_name = "raw_data"
    
    spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
    volume_folder =  f"/Volumes/{catalog}/{db}/{volume_name}"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exists, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  8. Чтобы просмотреть данные, используемые в этом руководстве, введите этот код в следующей ячейке и введите SHIFT + ВВОД , чтобы запустить код. Помните, что если вы изменили путь в предыдущей ячейке, необходимо внести аналогичные изменения здесь.

    display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
    

шаг 1. Создание конвейера

Сначала вы создадите конвейер ETL в системе Lakeflow Declarative Pipelines. Декларативные конвейеры Lakeflow создают конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемом исходным кодом) с помощью синтаксиса декларативного конвейера Lakeflow. Каждый файл исходного кода может содержать только один язык, но в конвейер можно добавить несколько записных книжек или файлов, относящихся к определенному языку. Дополнительные сведения см. в разделе "Декларативные конвейеры Lakeflow"

Important

Оставьте поле исходного кода пустым, чтобы автоматически создать и настроить записную книжку для разработки исходного кода.

В этом руководстве используются бессерверные вычислительные ресурсы и каталог Unity. Для всех параметров конфигурации, которые не указаны, используйте параметры по умолчанию. Если бессерверные вычисления не включены или не поддерживаются в вашей рабочей области, вы можете выполнить учебник, как описано, с использованием настроек вычислений по умолчанию. Если вы используете параметры вычислений по умолчанию, необходимо вручную выбрать каталог Unity в разделе Параметры хранилища в секции Назначение пользовательского интерфейса Создать конвейер.

Чтобы создать новый конвейер ETL в Декларативных конвейерах Lakeflow, выполните следующие действия.

  1. В рабочей области щелкните на значок рабочих процессовЗадания и конвейеры на боковой панели.
  2. В меню Создать нажмите Конвейер ETL.
  3. В имени конвейера введите уникальное имя конвейера.
  4. Установите флажок "Бессерверный ".
  5. Выберите Тригерный в Конвейерном режиме. Это приведет к выполнению потоковых процессов с помощью триггера "AvailableNow", который обрабатывает все существующие данные, а затем останавливает поток.
  6. В Пункте назначения, чтобы настроить расположение каталога Unity, в котором публикуются таблицы, выберите существующий каталог и введите новое имя в схему, чтобы создать новую схему в вашем каталоге.
  7. Click Create.

Пользовательский интерфейс конвейера отображается для нового конвейера.

Пустая записная книжка исходного кода создается автоматически и настроена для конвейера. Записная книжка создается в новом каталоге в пользовательском каталоге. Имя нового каталога и файла совпадает с именем конвейера. Например, /Users/[email protected]/my_pipeline/my_pipeline.

  1. Ссылка для доступа к этой записной книжке находится в поле исходного кода на панели Сведения о конвейере. Щелкните ссылку, чтобы открыть записную книжку, прежде чем перейти к следующему шагу.
  2. Нажмите кнопку "Подключиться" в правом верхнем углу, чтобы открыть меню конфигурации вычислений.
  3. Наведите указатель мыши на имя конвейера, созданного на шаге 1.
  4. Click Connect.
  5. Рядом с заголовком записной книжки в верхней части выберите язык по умолчанию записной книжки (Python или SQL).

Important

Записные книжки могут содержать только один язык программирования. Не смешивайте код Python и SQL в записных книжках исходного кода потока данных.

При разработке декларативных конвейеров Lakeflow можно выбрать Python или SQL. В этом руководстве приведены примеры для обоих языков. В зависимости от выбранного языка убедитесь, что выбран язык записной книжки по умолчанию.

Чтобы узнать больше о поддержке ноутбуков для разработки кода в Декларативных конвейерах Lakeflow, см. статью Разработка и отладка конвейеров ETL с помощью ноутбука в Декларативных конвейерах Lakeflow.

Шаг 2. Инкрементная загрузка данных с помощью Auto Loader

Первым шагом является загрузка необработанных данных из облачного хранилища в бронзовый слой.

Это может быть сложно по нескольким причинам, так как вы должны:

  • Работайте в большом масштабе, потенциально обрабатывая миллионы небольших файлов.
  • Определить схему и тип JSON.
  • Обработка плохих записей с неправильной схемой JSON.
  • Обратите внимание на эволюцию схемы (например, новый столбец в таблице клиента).

Автозагрузчик упрощает этот процесс приема, включая определение и изменение схемы, а также масштабирование до миллионов входящих файлов. Автозагрузчик доступен в Python с помощью cloudFiles и в SQL с помощью SELECT * FROM STREAM read_files(...), и его можно использовать с различными форматами (JSON, CSV, Apache Avro и т. д.).

Определение таблицы в виде потоковой таблицы гарантирует, что вы используете только новые входящие данные. Если вы не определите её как потоковую таблицу, вы будете сканировать и загружать все доступные данные. Дополнительные сведения см. в таблицах потоковой передачи .

  1. Чтобы принять входящие данные с помощью автозагрузчика, скопируйте и вставьте следующий код в первую ячейку записной книжки. Вы можете использовать Python или SQL в зависимости от языка записной книжки по умолчанию, выбранного на предыдущем шаге.

    Python

    from dlt import *
    from pyspark.sql.functions import *
    
    # Create the target bronze table
    dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. Нажмите кнопку "Пуск" , чтобы начать обновление подключенного конвейера.

Шаг 3. Очистка и требования для отслеживания качества данных

После определения бронзового слоя вы создадите серебряные слои, добавив ожидания для контроля качества данных, проверив следующие условия:

  • Идентификатор никогда не должен быть null.
  • Тип операции CDC должен быть допустимым.
  • Автозагрузчик должен был правильно прочитать json.

Строка будет удалена, если одно из этих условий не учитывается.

Дополнительные сведения см. в статье "Управление качеством данных с ожиданиями конвейера ".

  1. Нажмите кнопку "Изменить и вставить ячейку ниже ", чтобы вставить новую пустую ячейку.

  2. Чтобы создать серебряный слой с очищенной таблицей и наложить ограничения, скопируйте и вставьте следующий код в новую ячейку в записной книжке.

    Python

    dlt.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          dlt.read_stream("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  3. Нажмите кнопку "Пуск" , чтобы начать обновление подключенного конвейера.

Шаг 4. Материализация таблицы клиентов с помощью потока AUTO CDC

Таблица customers будет содержать наиболее up-to-date view и будет репликой исходной таблицы.

Это нетривиальное для реализации вручную. Необходимо учитывать такие вещи, как дедупликация данных, чтобы сохранить последнюю строку.

Однако декларативные конвейеры Lakeflow решают эти проблемы с операцией AUTO CDC .

  1. Нажмите кнопку "Изменить и вставить ячейку ниже ", чтобы вставить новую пустую ячейку.

  2. Чтобы обработать данные CDC с помощью AUTO CDC декларативных конвейеров обработки данных Lakeflow, скопируйте и вставьте следующий код в новую ячейку ноутбука.

    Python

    dlt.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dlt.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  3. Нажмите кнопку "Пуск" , чтобы начать обновление подключенного конвейера.

Шаг 5. Медленно меняющееся измерение типа 2 (SCD2)

Часто требуется создать таблицу для отслеживания всех изменений, в результате APPEND, UPDATE и DELETE.

  • Вы хотите сохранить историю всех изменений в таблице.
  • Возможность трассировки: вы хотите увидеть, какая операция произошла.

SCD2 с декларативными конвейерами Lakeflow

Delta поддерживает поток данных изменений (CDF) и table_change может запрашивать изменение таблицы в SQL и Python. Однако основной вариант использования CDF заключается в том, чтобы записывать изменения в конвейере и не создавать полное представление изменений таблицы с самого начала.

Реализация становится особенно сложной, если у вас есть неупорядоченные события. Если необходимо выполнить последовательность изменений по метке времени и получить изменения, которые произошли в прошлом, необходимо добавить новую запись в таблицу SCD и обновить предыдущие записи.

Декларативные конвейеры Lakeflow удаляют эту сложность и позволяют создать отдельную таблицу, содержащую все изменения с начала времени. Затем эту таблицу можно использовать в масштабе с определенными разделами или столбцами zorder, если это нужно. Поля вне порядка будут обрабатываться автоматически на основе _sequence_by.

Чтобы создать таблицу SCD2, необходимо использовать этот параметр: STORED AS SCD TYPE 2 в SQL или stored_as_scd_type="2" Python.

Note

Кроме того, можно ограничить столбцы, которые функция отслеживает, с помощью параметра: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. Нажмите кнопку "Изменить и вставить ячейку ниже ", чтобы вставить новую пустую ячейку.

  2. Скопируйте и вставьте следующий код в новую ячейку записной книжки.

    Python

    # create the table
    dlt.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dlt.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW cusotmers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  3. Нажмите кнопку "Пуск" , чтобы начать обновление подключенного конвейера.

Шаг 6: Создание материализованного представления, которое отслеживает, кто изменял свою информацию больше всего.

customers_history Таблица содержит все исторические изменения, внесенные пользователем в информацию. Теперь вы создадите простое материализованное представление в слое золота, которое отслеживает, кто изменил свою информацию чаще всего. Это можно использовать для анализа обнаружения мошенничества или рекомендаций пользователей в реальном мире. Кроме того, применение изменений с SCD2 уже удалило дубликаты, поэтому мы можем напрямую подсчитать количество строк по идентификатору пользователя.

  1. Нажмите кнопку "Изменить и вставить ячейку ниже ", чтобы вставить новую пустую ячейку.

  2. Скопируйте и вставьте следующий код в новую ячейку записной книжки.

    Python

    @dlt.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        dlt.read("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  3. Нажмите кнопку "Пуск" , чтобы начать обновление подключенного конвейера.

Шаг 7. Создание задания для запуска конвейера ETL

Затем создайте рабочий процесс для автоматизации приема, обработки и анализа данных с помощью задания Databricks.

  1. В рабочей области щелкните на значок рабочих процессовЗадания и конвейеры на боковой панели.
  2. В разделе "Новый" нажмите «Задание».
  3. В поле заголовка задачи замените дату и время< нового задания > именем задания. Например, CDC customers workflow.
  4. В поле "Имя задачи" введите имя первой задачи, например ETL_customers_data.
  5. В поле "Тип" выберите "Конвейер".
  6. В конвейере выберите конвейер, созданный на шаге 1.
  7. Click Create.
  8. Чтобы запустить рабочий процесс, нажмите кнопку "Запустить сейчас". Чтобы просмотреть детали запуска, щелкните вкладку Запуски. Щелкните задачу, чтобы просмотреть детали выполнения задачи.
  9. Чтобы просмотреть результаты после завершения рабочего процесса, нажмите кнопку "Перейти к последнему успешному запуску " или " Время начала " для выполнения задания. Откроется страница вывода и отображается результаты запроса.

Дополнительные сведения о выполнении заданий см. в статье "Мониторинг и наблюдаемость заданий Lakeflow ".

Шаг 8. Планирование задания

Чтобы запустить конвейер ETL по расписанию, выполните следующие действия.

  1. Щелкните значок рабочих процессов.Задания и конвейеры на боковой панели.
  2. При необходимости выберите фильтры "Задания " и " Принадлежащие мне ".
  3. В столбце "Имя" щелкните имя задания. Боковая панель отображается в виде сведений о задании.
  4. Нажмите кнопку "Добавить триггер" на панели "Расписания и триггеры " и выберите "Запланированный " в типе триггера.
  5. Укажите период, время начала и часовой пояс.
  6. Click Save.

Additional resources