Упражнение: интегрировать пулы SQL и Spark в Azure Synapse Analytics

Завершено

В следующем упражнении мы рассмотрим интеграцию пулов SQL и Apache Spark в Azure Synapse Analytics.

Интеграция пулов SQL и Apache Spark в Azure Synapse Analytics

Вы хотите выполнить запись в выделенный пул SQL после выполнения задач по инжинирингу данных в Spark, а затем ссылаться на данные пула SQL в качестве источника для объединения с кадрами данных Apache Spark, которые содержат данные из других файлов.

Вы решили использовать соединитель Spark для Synapse SQL в службе Azure Synapse для эффективной передачи данных между пулом Spark и пулами SQL в Azure Synapse.

Передача данных между пулами Apache Spark и SQL может выполняться с помощью JavaDataBaseConnectivity (JDBC). Но при наличии двух распределенных систем, таких как пулы Apache Spark и SQL, как правило, JDBC является узким местом при последовательной передаче данных.

Соединитель Spark для Synapse SQL в службе Azure Synapse реализует источник данных для Apache Spark. Он использует Azure Data Lake Storage 2-го поколения и PolyBase в пулах SQL для эффективной передачи данных между кластером Spark и экземпляром Synapse SQL.

  1. Чтобы использовать соединитель пула Apache Spark с Synapse SQL (sqlanalytics), можно создать временное представление данных в кадре данных. Выполните указанный ниже код в новой ячейке, чтобы создать представление с именем top_purchases:

    # Create a temporary view for top purchases 
    topPurchases.createOrReplaceTempView("top_purchases")
    

    Мы создали новое временное представление на основе таблицы данных topPurchases, созданной ранее, которая содержит плоские JSON-массивы данных о покупках пользователя.

  2. Необходимо выполнить код, который использует соединитель пула Apache Spark с Synapse SQL в Scala. Для этого мы добавляем в ячейку магическую команду %%spark. Выполните указанный ниже код в новой ячейке, чтобы выполнить чтение из представления top_purchases:

    %%spark
    // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
    val df = spark.sqlContext.sql("select * from top_purchases")
    df.write.sqlanalytics("SQLPool01.wwi.TopPurchases", Constants.INTERNAL)
    

    Примечание.

    Для выполнения ячейки может потребоваться более минуты. Если вы выполнили эту команду раньше, отобразится сообщение об ошибке: "Объект с именем... уже существует", так как таблица уже существует.

    После того как выполнение ячейки будет завершено, посмотрим на список таблиц пула SQL, чтобы убедиться, что таблица успешно создана.

  3. Оставьте записную книжку открытой, а затем перейдите к центру данных (если оно еще не выбрано).

    Выделен концентратор данных

  4. Выберите вкладку "Рабочая область" (1), разверните пул SQL, выберите многоточие (...) в таблицах (2) и нажмите кнопку "Обновить( 3)". Разверните таблицу wwi.TopPurchases и столбцы (4).

    Отображается таблица.

    Как видите, таблица wwi.TopPurchases автоматически создана на основе производной схемы кадра данных Apache Spark. Соединитель пула Apache Spark с Synapse SQL отвечает за создание таблицы и эффективную загрузку данных в нее.

  5. Вернитесь в записную книжку и выполните приведенный ниже код в новой ячейке, чтобы считывать данные о продажах из всех файлов Parquet, расположенных в папке sale-small/Year=2019/Quarter=Q4/Month=12/ :

    dfsales = spark.read.load('abfss://wwi-02@' + datalake + '.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet', format='parquet')
    display(dfsales.limit(10))
    

    Примечание.

    Выполнение этой ячейки может занять более 3 минут.

    Переменная datalake, созданная в первой ячейке, используется в качестве части пути к файлу.

    Отображаются выходные данные ячейки.

    Сравните путь к файлу, указанный в ячейке выше, с путем к файлу, указанным в первой ячейке. Здесь мы используем относительный путь для загрузки всех данных о продажах за декабрь 2019 года из файлов Parquet, расположенных в sale-small, в отличие от загрузки только данных о продажах за 31 декабря 2010 года.

    Теперь давайте загрузим данные TopSales из созданной ранее таблицы пула SQL в новую таблицу данных Apache Spark, а затем объединим ее с новым кадром данных dfsales. Для этого нам нужно снова использовать магическую команду %%spark в новой ячейке, так как мы будем использовать соединитель пула Apache Spark с Synapse SQL для получения данных из пула SQL. Затем необходимо добавить содержимое кадра данных в новое временное представление, чтобы получить доступ к данным из Python.

  6. Выполните указанный ниже код в новой ячейке, чтобы прочитать данные из таблицы пула SQL TopSales и сохранить ее во временном представлении:

    %%spark
    // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
    val df2 = spark.read.sqlanalytics("SQLPool01.wwi.TopPurchases")
    df2.createTempView("top_purchases_sql")
    
    df2.head(10)
    

    Ячейка и ее выходные данные отображаются, как описано.

    Язык ячейки устанавливается Scala с помощью %%spark магической команды (1) вверху ячейки. Мы объявили новую переменную с именем df2 как новый DataFrame, созданный методом spark.read.sqlanalytics, который считывает из таблицы TopPurchases(2) в пуле SQL. Затем мы заполняли новое временное представление с именем top_purchases_sql(3). Наконец, мы показали первые 10 записей со строкой df2.head(10))(4). Выходные данные ячейки отображают значения DataFrame (5).

  7. Выполните указанный ниже код в новой ячейке, чтобы создать кадр данных в Python на основе временного представления top_purchases_sql, а затем отобразите первые 10 результатов:

    dfTopPurchasesFromSql = sqlContext.table("top_purchases_sql")
    
    display(dfTopPurchasesFromSql.limit(10))
    

    Отображаются код и вывод DataFrame.

  8. Выполните указанный ниже код в новой ячейке, чтобы присоединить данные из файлов данных о продажах Parquet и пула SQL TopPurchases.

    inner_join = dfsales.join(dfTopPurchasesFromSql,
        (dfsales.CustomerId == dfTopPurchasesFromSql.visitorId) & (dfsales.ProductId == dfTopPurchasesFromSql.productId))
    
    inner_join_agg = (inner_join.select("CustomerId","TotalAmount","Quantity","itemsPurchasedLast12Months","top_purchases_sql.productId")
        .groupBy(["CustomerId","top_purchases_sql.productId"])
        .agg(
            sum("TotalAmount").alias("TotalAmountDecember"),
            sum("Quantity").alias("TotalQuantityDecember"),
            sum("itemsPurchasedLast12Months").alias("TotalItemsPurchasedLast12Months"))
        .orderBy("CustomerId") )
    
    display(inner_join_agg.limit(100))
    

    В запросе мы объединили кадры данных dfsales и dfTopPurchasesFromSql, сопоставляя CustomerId и ProductId. Это соединение объединило данные таблицы пула SQL с данными TopPurchases о продажах Parquet за декабрь 2019 г. (1).

    Мы сгруппировали данные в полях CustomerId и ProductId. ProductId Так как имя поля неоднозначно (оно существует в обоих кадрах данных), нам пришлось полностью указать ProductId имя для ссылки на имя в TopPurchases кадре данных (2).

    Затем мы создали агрегат, суммированный общий объем, потраченный на каждый продукт в декабре, общее количество товаров в декабре и общее количество товаров, приобретенных за последние 12 месяцев (3).

    И, наконец, мы отобразили объединенные и агрегированные данные в табличном представлении.

    Вы можете выбрать заголовки столбцов в представлении таблицы, чтобы отсортировать набор результатов.

    Отображаются содержимое и выходные данные ячейки.