Поделиться через


Руководство. Создание конвейера ETL с помощью DLT

Узнайте, как создать и развернуть конвейер ETL (извлечение, преобразование и загрузка) для оркестрации данных с помощью DLT и автозагрузчика. Конвейер ETL реализует шаги для чтения данных из исходных систем, преобразования этих данных на основе требований, таких как проверки качества данных и отмена дублирования данных, а также запись данных в целевую систему, например хранилище данных или озеро данных.

В этом руководстве вы будете использовать DLT и автозагрузчик для:

  • Загрузка необработанных данных из источника в целевую таблицу.
  • Преобразование необработанных исходных данных и запись преобразованных данных в два целевых материализованных представления.
  • Запрос преобразованных данных.
  • Автоматизируйте конвейер ETL с помощью задачи в Databricks.

Дополнительные сведения о DLT и автозагрузчике см. в разделе DLT и Что такое автозагрузчик?

Требования

Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:

Сведения о наборе данных

Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.

шаг 1. Создание конвейера

Сначала вы создадите конвейер ETL в DLT. DLT создает конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемых исходным кодом) с помощью синтаксиса DLT. Каждый файл исходного кода может содержать только один язык, но в конвейер можно добавить несколько записных книжек или файлов, относящихся к определенному языку. Дополнительные сведения см. в разделе DLT

Это важно

Оставьте поле исходного кода пустым, чтобы автоматически создать и настроить записную книжку для разработки исходного кода.

В этом руководстве используются бессерверные вычислительные ресурсы и каталог Unity. Для всех параметров конфигурации, не указанных, используйте параметры по умолчанию. Если бессерверные вычисления не включены или не поддерживаются в вашей рабочей области, вы можете выполнить учебник, как описано, с использованием настроек вычислений по умолчанию. Если вы используете параметры вычислений по умолчанию, необходимо вручную выбрать каталог Unity из параметров хранения в разделе Назначение пользовательского интерфейса Создать конвейер.

Чтобы создать новый конвейер ETL в DLT, выполните следующие действия.

  1. На боковой панели щелкните "Конвейеры".
  2. Нажмите «Создать трубопровод» и «ETL-трубопровод».
  3. В имени конвейера введите уникальное имя конвейера.
  4. Установите флажок "Бессерверный ".
  5. В Пункте назначения, чтобы настроить расположение каталога Unity, в котором публикуются таблицы, выберите существующий каталог и введите новое имя в схему, чтобы создать новую схему в вашем каталоге.
  6. Нажмите кнопку "Создать".
  7. Щелкните ссылку записной книжки в поле Исходный код на панели Сведения о конвейере.

Пользовательский интерфейс конвейера отображается для нового конвейера.

Шаг 2. Разработка конвейера DLT

Это важно

Записные книжки могут содержать только один язык программирования. Не смешивайте код Python и SQL в записных книжках исходного кода потока данных.

На этом шаге вы будете использовать записные книжки Databricks для интерактивной разработки и проверки исходного кода для конвейеров DLT.

В коде используется автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище. Дополнительные сведения см. в разделе "Что такое автозагрузчик"

Пустая записная книжка исходного кода создается автоматически и настроена для конвейера. Записная книжка создается в новом каталоге в пользовательском каталоге. Имя нового каталога и файла совпадает с именем конвейера. Например, /Users/[email protected]/my_pipeline/my_pipeline.

При разработке конвейера DLT можно выбрать Python или SQL. Примеры включены для обоих языков. В зависимости от выбранного языка убедитесь, что выбран язык записной книжки по умолчанию. Дополнительные сведения о поддержке записных книжек для разработки кода конвейера DLT см. в статье Разработка и отладка конвейеров ETL с помощью записной книжки в DLT.

  1. Ссылка на доступ к этой записной книжке находится в поле Исходный код на панели сведений о конвейере. Щелкните ссылку, чтобы открыть записную книжку, прежде чем перейти к следующему шагу.

  2. Нажмите кнопку "Подключиться" в правом верхнем углу, чтобы открыть меню конфигурации вычислений.

  3. Наведите указатель мыши на имя конвейера, созданного на шаге 1.

  4. Нажмите кнопку "Подключить".

  5. Рядом с заголовком записной книжки в верхней части выберите язык по умолчанию записной книжки (Python или SQL).

  6. Скопируйте и вставьте следующий код в ячейку записной книжки.

    Питон

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists each year who released most songs."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year who released most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    

Шаг 3. Запрос преобразованных данных

На этом шаге вы будете запрашивать данные, обработанные в конвейере ETL, чтобы проанализировать данные песни. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.

Во-первых, выполните запрос, который находит артистов, выпустивших больше всего песен каждый год, начиная с 1990 года.

  1. На боковой панели нажмите значок редактора SQLредактор SQL.

  2. Щелкните значок " и выберите " Создать запрос " в меню.

  3. Введите следующее:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    Замените <catalog> и <schema> на имя каталога и схемы, в которых находится таблица. Например, data_pipelines.songs_data.top_artists_by_year.

  4. Нажмите Выполнить выбранное.

Теперь выполните еще один запрос, который находит песни с 4/4 ударом и танцуемым темпом.

  1. Щелкните значок " и нажмите кнопку "Создать запрос " в меню.

  2. Введите следующий код:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    Замените <catalog> и <schema> на название каталога и схемы, в которых находится таблица. Например, data_pipelines.songs_data.songs_prepared.

  3. Нажмите Выполнить выбранное.

Шаг 4. Создание задания для запуска конвейера DLT

Затем создайте рабочий процесс для автоматизации выполнения действий приема, обработки и анализа данных с помощью задания Databricks.

  1. В рабочей области щелкните по значку Рабочие процессыРабочие процессы на боковой панели и нажмите Создать задание.
  2. В поле заголовка задачи замените дату и время> нового задания < именем задания. Например, Songs workflow.
  3. В поле "Имя задачи" введите имя первой задачи, например ETL_songs_data.
  4. В поле "Тип" выберите "Конвейер".
  5. В конвейере выберите конвейер DLT, созданный на шаге 1.
  6. Нажмите кнопку "Создать".
  7. Чтобы запустить рабочий процесс, нажмите кнопку "Запустить сейчас". Чтобы просмотреть сведения о запуске, щелкните вкладку "Запуски". Щелкните задачу, чтобы просмотреть подробности запуска задачи.
  8. Чтобы просмотреть результаты после завершения рабочего процесса, нажмите кнопку "Перейти к последнему успешному запуску " или " Время начала " для выполнения задания. Откроется страница вывода и отображается результаты запроса.

Дополнительные сведения о выполнении заданий см. в разделе "Мониторинг и наблюдаемость для заданий Databricks".

Шаг 5. Запланируйте работу конвейера DLT

Чтобы запустить конвейер ETL по расписанию, выполните следующие действия.

  1. Щелкните значок рабочих процессовWorkflows на боковой панели.
  2. В столбце "Имя" щелкните имя задания. На боковой панели отображаются сведения о задании.
  3. Нажмите кнопку "Добавить триггер" на панели "Расписания и триггеры " и выберите "Запланированный " в типе триггера.
  4. Укажите период, время начала и часовой пояс.
  5. Нажмите кнопку "Сохранить".

Подробнее