Поделиться через


Руководство по созданию конвейера ETL с помощью декларативных конвейеров Lakeflow

В этом руководстве объясняется, как создать и развернуть конвейер ETL (извлечение, преобразование и загрузку) для оркестрации данных с помощью декларативных конвейеров Lakeflow и автозагрузчика. Конвейер ETL реализует шаги для чтения данных из исходных систем, преобразования этих данных на основе требований, таких как проверки качества данных и отмена дублирования данных, а также запись данных в целевую систему, например хранилище данных или озеро данных.

В этом руководстве вы будете использовать декларативные конвейеры Lakeflow и функцию автоматической загрузки:

  • Загрузка необработанных исходных данных в целевую таблицу.
  • Преобразование необработанных исходных данных и запись преобразованных данных в два целевых материализованных представления.
  • Запрос преобразованных данных.
  • Автоматизируйте конвейер ETL с помощью задачи в Databricks.

Дополнительные сведения о декларативных конвейерах Lakeflow и автозагрузчике см. в разделе "Декларативные конвейеры Lakeflow " и "Что такое автозагрузчик?

Требования

Чтобы завершить работу с этим руководством, необходимо выполнить следующие требования:

Сведения о наборе данных

Набор данных, используемый в этом примере, представляет собой подмножество набора данных "Миллион песен", коллекцию функций и метаданных для современных музыкальных треков. Этот набор данных доступен в примерах наборов данных, включенных в рабочую область Azure Databricks.

шаг 1. Создание конвейера

Сначала создайте ETL-конвейер в Lakeflow Declarative Pipelines. Декларативные конвейеры Lakeflow создают конвейеры путем разрешения зависимостей, определенных в файлах (называемом исходным кодом) с помощью синтаксиса декларативного конвейера Lakeflow. Каждый файл исходного кода может содержать только один язык, но в конвейер можно добавить несколько файлов, относящихся к языку. Дополнительные сведения см. в разделе "Декларативные конвейеры Lakeflow"

В этом руководстве используются бессерверные вычислительные ресурсы и каталог Unity. Для всех параметров конфигурации, которые не указаны, используйте параметры по умолчанию. Если бессерверные вычисления не включены или не поддерживаются в вашей рабочей области, вы можете выполнить учебник, как описано, с использованием настроек вычислений по умолчанию.

Чтобы создать новый конвейер ETL в Декларативных конвейерах Lakeflow, выполните следующие действия.

  1. В рабочей области щелкните значок Новый на боковой панели и выберите конвейер ETL.
  2. Присвойте конвейеру уникальное имя.
  3. Под именем выберите каталог по умолчанию и схему для создаваемых данных. Вы можете указать другие назначения в преобразованиях, но в этом руководстве используются эти значения по умолчанию. У вас должны быть разрешения на создание каталога и схемы. См. раздел Требования.
  4. В этом руководстве выберите "Начать с пустого файла".
  5. В пути к папке укажите расположение исходных файлов или примите значение по умолчанию (папка пользователя).
  6. Выберите Python или SQL в качестве языка для первого исходного файла (конвейер может смешивать и сопоставлять языки, но каждый файл должен находиться на одном языке).
  7. Щелкните Выбрать.

Редактор конвейера отображается для нового конвейера. Создается пустой исходный файл для вашего языка, готовый к первому преобразованию.

Шаг 2. Разработка логики конвейера

На этом шаге вы будете использовать редактор Конвейеров Lakeflow для интерактивного разработки и проверки исходного кода для декларативных конвейеров Lakeflow.

В коде используется автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище. Дополнительные сведения см. в разделе "Что такое автозагрузчик"

Пустой файл исходного кода создается автоматически и настраивается для конвейера. Файл создается в папке преобразований конвейера. По умолчанию все файлы *.py и *.sql в папке трансформаций являются частью источника для вашего конвейера данных.

  1. Скопируйте и вставьте следующий код в исходный файл. Обязательно используйте язык, выбранный для файла на шаге 1.

    Питон

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    Этот источник содержит код для трех запросов. Вы также можете поместить эти запросы в отдельные файлы, чтобы упорядочить файлы и закодировать нужный способ.

  2. Щелкните значок плей.Запустите файл или запустите конвейер чтобы начать обновление подключенного конвейера. С одним исходным файлом в конвейере они функционально эквивалентны.

По завершении обновления в редактор передается новая информация о вашем конвейере данных.

  • График конвейера (DAG) в боковой панели справа от кода отображает три таблицы, songs_rawsongs_preparedа также top_artists_by_year.
  • Сводка по обновлению отображается в верхней части обозревателя активов конвейера.
  • Сведения о созданных таблицах отображаются на нижней панели и можно просматривать данные из таблиц, выбрав один.

Это включает необработанные и чистые данные, а также некоторый простой анализ для поиска лучших художников по годам. На следующем шаге вы создаете неструктурированные запросы для дальнейшего анализа в отдельном файле в потоке данных.

Шаг 3. Изучение наборов данных, созданных конвейером

На этом шаге вы выполняете нерегламентированные запросы к данным, обработанным в конвейере ETL, для анализа данных песни в редакторе SQL Databricks. Эти запросы используют подготовленные записи, созданные на предыдущем шаге.

Сначала выполните запрос, который находит артистов, выпустивших наибольшее количество песен каждый год с 1990 года.

  1. На боковой панели браузера ресурсов конвейера щелкните значок Добавьте затем исследование.

  2. Введите имя и выберите SQL для файла исследования. Записная книжка SQL создается в новой explorations папке. Файлы в папке explorations по умолчанию не выполняются как часть обновления конвейера. Блокнот SQL содержит ячейки, которые можно выполнять вместе или отдельно.

  3. Чтобы создать таблицу художников, которые выпускают большинство песен в год после 1990 года, введите следующий код в новый SQL-файл (если в файле есть пример кода, замените его). Так как эта записная книжка не является частью конвейера, она не использует каталог по умолчанию и схему. Замените <catalog>.<schema> на каталог и схему, которые вы использовали в качестве значений по умолчанию для конвейера.

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. Щелкните значок воспроизведения или нажмите, Shift + Enter чтобы запустить этот запрос.

Теперь выполните еще один запрос, который находит песни с 4/4 ударом и танцуемым темпом.

  1. Добавьте следующий код в следующую ячейку в том же файле. Опять же, замените <catalog>.<schema> на каталог и схему, которые вы использовали по умолчанию для конвейера.

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. Щелкните значок воспроизведения или нажмите, Shift + Enter чтобы запустить этот запрос.

Шаг 4. Создание задания для запуска конвейера

Затем создайте рабочий процесс для автоматизации приема, обработки и анализа данных с помощью задания Databricks, выполняемого по расписанию.

  1. В верхней части редактора нажмите кнопку "Расписание ".
  2. Если появится диалоговое окно "Расписания" , нажмите кнопку "Добавить расписание".
  3. Откроется диалоговое окно создания расписания , в котором можно создать задание для запуска конвейера по расписанию.
  4. При необходимости присвойте заданию имя.
  5. По умолчанию расписание выполняется один раз в день. Вы можете принять настройку по умолчанию или задать собственное расписание. При выборе дополнительно можно задать определенное время выполнения задания. Выбор дополнительных параметров позволяет создавать уведомления при выполнении задания.
  6. Нажмите кнопку "Создать", чтобы применить изменения и создать задание.

Теперь задание будет выполняться ежедневно, чтобы поддерживать актуальность вашего потока обработки данных. Вы можете снова выбрать расписание , чтобы просмотреть список расписаний. Вы можете управлять расписаниями для конвейера из этого диалогового окна, включая добавление, редактирование или удаление расписаний.

Щелкнув имя расписания (или задания), вы перейдете на страницу задания в списке заданий и конвейеров . Здесь вы можете просмотреть сведения о выполнении заданий, включая журнал запусков, или запустить задание сразу с помощью кнопки "Запустить сейчас ".

Дополнительные сведения о выполнении заданий см. в статье "Мониторинг и наблюдаемость заданий Lakeflow ".

Подробнее