Поделиться через


Руководство. Извлечение, преобразование и загрузка данных с помощью Azure Databricks

В этом руководстве рассматривается выполнение операций извлечения, преобразования и загрузки данных с помощью Azure Databricks. Мы извлечем данные из Azure Data Lake Storage 2-го поколения в Azure Databricks, выполним преобразование данных в Azure Databricks, а затем загрузим преобразованные данные в Azure Synapse Analytics.

Для действий, описанных в этом руководстве, используется соединитель Azure Synapse для Azure Databricks, позволяющий передавать данные в Azure Databricks. Этот соединитель, в свою очередь, использует хранилище BLOB-объектов Azure как временное хранилище для данных, передаваемых между кластером Azure Databricks и Azure Synapse.

На следующем рисунке показан поток в приложении.

Azure Databricks с Data Lake Store и Azure Synapse

В рамках этого руководства рассматриваются следующие задачи:

  • Создайте службу Azure Databricks.
  • создание кластера Spark в Azure Databricks;
  • Создание файловой системы в учетной записи Data Lake Storage 2-го поколения.
  • Отправка примера данных в учетную запись Azure Data Lake Storage 2-го поколения.
  • Создание субъекта-службы.
  • Извлечение данных из учетной записи Azure Data Lake Storage 2-го поколения.
  • преобразование данных в Azure Databricks;
  • Загрузка данных в Azure Synapse.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Примечание.

Инструкции из этого руководство нельзя выполнять с бесплатной пробной версией подписки. Если у вас есть бесплатная учетная запись, перейдите к профилю и измените подписку на подписку с оплатой по мере использования. Дополнительные сведения см. на странице создания бесплатной учетной записи Azure. Затем удалите предельную сумму расходов и запросите увеличение квоты для виртуальных ЦП в своем регионе. При создании рабочей области Azure Databricks можно выбрать ценовую категорию Пробная версия ("Премиум" — 14 дней бесплатно (DBU)) для предоставления рабочей области доступа к бесплатным DBU Azure Databricks уровня "Премиум" на 14 дней.

Необходимые компоненты

Прежде чем начать работу с этим руководством, выполните следующие задачи:

Сбор необходимых сведений

Обязательно выполните предварительные требования данного руководства.

Прежде чем начать, вам понадобится собрать такую информацию:

✔️ имя базы данных, имя сервера базы данных, имя пользователя и пароль Azure Synapse;

✔️ ключ доступа к учетной записи хранения больших двоичных объектов;

✔️ имя учетной записи хранения Data Lake Storage 2-го поколения;

✔️ идентификатор арендатора для подписки;

✔️ Идентификатор приложения, зарегистрированного в идентификаторе Microsoft Entra (ранее — Azure Active Directory).

✔️ Ключ проверки подлинности для приложения, зарегистрированного в идентификаторе Microsoft Entra (ранее — Azure Active Directory).

Создание службы Azure Databricks.

В этом разделе вы создадите службу Azure Databricks с помощью портала Azure.

  1. В меню портала Azure выберите Создать ресурс.

    Создание ресурса на портале Azure

    Затем выберите Аналитика>Azure Databricks.

    Создание Azure Databricks на портале Azure

  2. В разделе Служба Azure Databricks укажите следующие значения, чтобы создать службу Databricks.

    Свойство Description
    Имя рабочей области Укажите имя рабочей области Databricks.
    Подписка Выберите подписку Azure в раскрывающемся списке.
    Группа ресурсов Укажите, следует ли создать новую группу ресурсов или использовать имеющуюся. Группа ресурсов — это контейнер, содержащий связанные ресурсы для решения Azure. Дополнительные сведения см. в обзоре группы ресурсов Azure.
    Местонахождение Выберите Западная часть США 2. Другие доступные регионы см. в статье о доступности служб Azure по регионам.
    Ценовая категория Выберите Стандартное.
  3. Создание учетной записи занимает несколько минут. Чтобы отслеживать состояние операции, просмотрите индикатор выполнения вверху.

  4. Выберите Закрепить на панели мониторинга и нажмите кнопку Создать.

Создание кластера Spark в Azure Databricks.

  1. На портале Azure перейдите к созданной службе Databricks, а затем выберите Launch Workspace (Запустить рабочую область).

  2. Вы будете перенаправлены на портал Azure Databricks. На портале выберите Кластер.

    Databricks в Azure

  3. На странице создания кластера укажите значения для создания кластера.

    Создание кластера Databricks Spark в Azure

  4. Заполните значения в следующих полях, и примите значения по умолчанию для других полей.

    • Введите имя кластера.

    • Убедитесь, что установлен флажок Terminate after __  minutes of inactivity (Завершить через __ мин бездействия). Укажите длительность (в минутах) для завершения работы кластера, если тот не используется.

    • Выберите Create cluster (Создать кластер). Когда кластер будет выполняться, можно присоединить к нему записные книжки и запустить на нем задания Spark.

Создание файловой системы в учетной записи Azure Data Lake Storage 2-го поколения

В этом разделе показано создание записной книжки в рабочей области Azure Databricks и выполнение фрагментов кода для настройки учетной записи хранения.

  1. На портале Azure перейдите к созданной службе Azure Databricks, а затем выберите Launch Workspace (Запустить рабочую область).

  2. В левой области выберите Рабочая область. В раскрывающемся списке Рабочая область выберите Создать>Notebook (Записная книжка).

    Создание записной книжки в Databricks

  3. В диалоговом окне создания записной книжки введите имя записной книжки. Выберите Scala в качестве языка, а затем выберите созданный ранее кластер Spark.

    Указание сведений для записной книжки в Databricks

  4. Нажмите кнопку создания.

  5. Следующий блок кода устанавливает учетные данные субъекта-службы по умолчанию для любой учетной записи ADLS 2-го поколения, доступ к которой осуществляется в сеансе Spark. Второй блок кода добавляет имя учетной записи к параметру, чтобы указать учетные данные для конкретной учетной записи ADLS 2-го поколения. Скопируйте и вставьте любой блок кода в первую ячейку записной книжки Azure Databricks.

    Конфигурация сеанса

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    Конфигурация учетной записи

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. В этом блоке кода замените значения заполнителя <app-id>, <secret>, <tenant-id> и <storage-account-name> значениями, полученными в ходе выполнения предварительных условий этого руководства. Значение заполнителя <file-system-name> замените на имя файловой системы.

    • <app-id> и <secret> заменяются значениями из приложения, которое вы зарегистрировали в Azure AD при создании субъекта-службы.

    • Значение <tenant-id> берется из подписки.

    • <storage-account-name> — это имя учетной записи хранения Azure Data Lake Storage 2-го поколения.

  7. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Прием примера данных в учетную запись Azure Data Lake Storage 2-го поколения

Прежде чем приступить к работе с этим разделом, выполните следующие предварительные требования.

Введите следующий код в ячейку записной книжки:

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

В ячейке нажмите клавиши SHIFT+ВВОД, чтобы выполнить код.

Теперь в новую ячейку под этой введите следующий код и замените значения, которые отображаются в скобках, использованными ранее.

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

В ячейке нажмите клавиши SHIFT+ВВОД, чтобы выполнить код.

Извлечение данных из учетной записи Azure Data Lake Storage 2-го поколения

  1. Теперь можно загрузить пример JSON-файла в виде кадра данных в Azure Databricks. Вставьте следующий код в новую ячейку. Замените значения заменителей, показанные в скобках, собственными.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

  3. Чтобы просмотреть содержимое кадра данных, выполните следующий код:

    df.show()
    

    Должен отобразиться результат, аналогичный приведенному ниже фрагменту кода.

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    В результате были извлечены данные из Azure Data Lake Storage Gen2 в Azure Databricks.

Преобразование данных в Azure Databricks.

Необработанный файл с примером данных small_radio_json.json содержит сведения о слушателях радиостанции и имеет множество столбцов. В этом разделе мы преобразуем данные, чтобы извлечь только определенные столбцы из набора данных.

  1. Сначала извлеките только столбцы firstName, lastName, gender, location и level из созданного кадра данных.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    Выходные данные должны выглядеть так, как показано в следующем фрагменте кода:

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. Эти данные можно еще преобразовывать, переименовав столбец level на subscription_type.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    Выходные данные должны выглядеть так, как показано в следующем фрагменте кода.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Загрузка данных в Azure Synapse

В этом разделе преобразованные данные отправляются в Azure Synapse. С помощью соединителя Azure Synapse для Azure Databricks кадр данных можно напрямую отправить в виде таблицы в пул Spark Synapse.

Как упоминалось ранее, соединитель Azure Synapse использует хранилище BLOB-объектов Azure в качестве временного хранилища для передачи данных между Azure Databricks и Azure Synapse. Таким образом сначала нужно предоставить конфигурацию для подключения к учетной записи хранения. Вы уже должны были создать учетную запись, выполняя предварительные требования для этой статьи.

  1. Предоставьте конфигурацию для получения доступа к учетной записи хранения Azure из Azure Databricks.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Укажите временную папку, которая будет использоваться при перемещении данных между Azure Databricks и Azure Synapse.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. Запустите следующий фрагмент кода, чтобы сохранить ключ доступа к хранилищу BLOB-объектов Azure в конфигурации. Благодаря этому действию вам не придется хранить ключ доступа в записной книжке в виде обычного текста.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Укажите значения для подключения к экземпляру Azure Synapse. В качестве необходимого компонента следует создать службу Azure Synapse Analytics. Используйте полное имя сервера для dwServer. Например, <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. Выполните следующий фрагмент кода, чтобы загрузить преобразованный кадр данных renamedColumnsDF в качестве таблицы в Azure Synapse. Этот фрагмент кода создает таблицу с именем SampleTable в базе данных SQL.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    Примечание.

    В этом примере используется флаг forward_spark_azure_storage_credentials, который вызывает Azure Synapse для доступа к данным из хранилища BLOB-объектов с помощью ключа доступа. Это единственный поддерживаемый способ проверки подлинности.

    Если для хранилища BLOB-объектов Azure нельзя выбирать виртуальные сети, Azure Synapse запросит Управляемое удостоверение службы, а не ключи доступа. В таком случае вы получите ошибку с сообщением о том, что у вызывающей стороны нет прав на выполнение этой операции.

  6. Подключитесь к базе данных SQL и убедитесь, что вы видите базу данных SampleTable.

    Проверка образца таблицы

  7. Выполните запрос SELECT, чтобы проверить содержимое таблицы. В таблице должны быть те же данные, что и в кадре данных renamedColumnsDF.

    Проверка содержимого образца таблицы

Очистка ресурсов

После выполнения заданий из этого руководства вы можете завершить работу кластера. В рабочей области Azure Databricks в области слева выберите Кластеры. Для завершения работы кластера в разделе Действия наведите указатель мыши на многоточие (...) и выберите значок Завершить.

Остановка кластера Databricks

Если не завершить работу кластера вручную, она завершится автоматически, если во время создания кластера вы установили флажок Terminate after __ minutes of inactivity (Завершить через __ мин бездействия). В этом случае работа кластера автоматически завершается, если он был неактивным в течение определенного времени.

Следующие шаги

Из этого руководства вы узнали, как:

  • Создание службы Azure Databricks.
  • Создание кластера Spark в Azure Databricks.
  • Создание записной книжки в Azure Databricks.
  • Извлечение данных из учетной записи Data Lake Storage 2-го поколения.
  • Преобразование данных в Azure Databricks.
  • Загрузка данных в Azure Synapse