Интеграция OneLake с Azure Databricks

В этой статье показано, как получить доступ к данным OneLake из Azure Databricks. Оба подхода используют аутентификацию сервисного принципала и конечную точку OneLake ABFS. Выберите раздел, соответствующий типу вычислений Databricks:

  • Стандартный или кластер заданий: Используйте драйвер Spark ABFS с конфигурацией OAuth для чтения и записи данных непосредственно через DataFrame Spark.
  • Бессерверные вычисления: бессерверные среды выполнения не позволяют задавать настраиваемые свойства конфигурации Spark. Вместо этого используйте библиотеку Microsoft Authentication Library (MSAL) и библиотеку Python deltalake для проверки подлинности и чтения или записи разностных таблиц.

Связанные сценарии интеграции Databricks см. в следующих ресурсах:

Сценарий Документации
Выполнение запроса данных OneLake из каталога Unity без их копирования. Включить федерацию каталога OneLake
Доступ к данным каталога Unity Databricks из Fabric Зеркальное отображение каталога Azure Databricks Unity

Предпосылки

Перед подключением убедитесь, что у вас есть:

  • Рабочая область Fabric и озерохранилище.
  • Рабочая область Azure Databricks уровня "Премиум".
  • Служебный принципал с назначением роли рабочей области "Участник" по крайней мере.
  • Для хранения и извлечения секретов используйте секреты Databricks или Azure Key Vault (AKV). В примерах в этой статье используются секреты Databricks.

Подключение к OneLake с помощью стандартного кластера

Использование правильного формата пути OneLake ABFS

Используйте один из следующих форматов URI:

  • abfss://<workspace_id_or_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_id_or_name>.lakehouse/Files/<path>
  • abfss://<workspace_id_or_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_id_or_name>.lakehouse/Tables/<path>

Использовать можно идентификаторы или имена. Если вы используете имена, избегайте специальных символов и пробелов в названиях рабочих областей и lakehouse.

Использование аутентификации служебного субъекта

Используйте этот параметр для автоматизированных заданий и централизованной смены секретов.

workspace_name = "<workspace_name>"
lakehouse_name = "<lakehouse_name>"
tenant_id = dbutils.secrets.get(scope="<scope-name>", key="<tenant-id-key>")
service_principal_id = dbutils.secrets.get(scope="<scope-name>", key="<client-id-key>")
service_principal_secret = dbutils.secrets.get(scope="<scope-name>", key="<client-secret-key>")

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set(
   "fs.azure.account.oauth.provider.type",
   "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
)
spark.conf.set("fs.azure.account.oauth2.client.id", service_principal_id)
spark.conf.set("fs.azure.account.oauth2.client.secret", service_principal_secret)
spark.conf.set(
   "fs.azure.account.oauth2.client.endpoint",
   f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
)

# Read
df = spark.read.format("parquet").load(
   f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse/Files/data"
)
df.show(10)

# Write
df.write.format("delta").mode("overwrite").save(
   f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse/Tables/dbx_delta_spn"
)

Подключитесь к OneLake с использованием бессерверного вычисления

Бессерверные вычислительные ресурсы Databricks позволяют запускать рабочие нагрузки без подготовки кластера, но это позволяет выполнять только подмножество поддерживаемых свойств Spark. Конфигурацию Spark, используемую fs.azure.* в стандартных кластерах, нельзя задать.

Замечание

Это ограничение не является уникальным для Azure Databricks. Реализации без сервера Databricks в Amazon Web Services (AWS) и Google Cloud имеют то же поведение.

Если вы пытаетесь задать неподдерживаемую конфигурацию Spark в бессерверной записной книжке, система возвращает ошибку CONFIG_NOT_AVAILABLE.

Снимок экрана: сообщение об ошибке, если пользователь пытается изменить неподдерживаемую конфигурацию Spark в бессерверных вычислениях.

Вместо этого используйте MSAL для получения маркера OAuth и библиотеки Python deltalake для чтения или записи таблиц Delta с этим маркером.

Настройка бессерверной записной книжки

  1. Создайте записную книжку в рабочей области Databricks и подключите ее к бессерверным вычислениям.

    Снимок экрана, показывающий, как подключить записную книжку Databricks к бессерверным вычислениям.

  2. Импорт модулей Python. В этом примере используйте два модуля:

    • msal выполняет проверку подлинности с помощью платформа удостоверений Майкрософт.
    • deltalake считывает и записывает таблицы Delta Lake с Python.
    from msal import ConfidentialClientApplication
    from deltalake import DeltaTable, write_deltalake
    
  3. Объявите переменные для клиента Microsoft Entra, включая идентификатор приложения. Используйте идентификатор клиента, в котором развернута Microsoft Fabric.

    # Fetch from Databricks secrets.
    tenant_id = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for tenant_id>")
    client_id = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for client_id>")
    client_secret = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for secret>")
    
  4. Объявите переменные рабочей области Fabric.

    workspace_id = "<replace with workspace name>"
    lakehouse_id = "<replace with lakehouse name>"
    table_to_read = "<name of lakehouse table to read>"
    onelake_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}.lakehouse/Tables/{table_to_read}"
    
  5. Инициализируют клиента для получения токена.

    authority = f"https://login.microsoftonline.com/{tenant_id}"
    
    app = ConfidentialClientApplication(
        client_id,
        authority=authority,
        client_credential=client_secret
    )
    
    result = app.acquire_token_for_client(scopes=["https://onelake.fabric.microsoft.com/.default"])
    
    if "access_token" in result:
        print("Access token acquired.")
        token_val = result['access_token']
    else:
        raise Exception(f"Failed to acquire token: {result.get('error_description', result)}")
    
  6. Чтение таблицы Delta из OneLake.

    dt = DeltaTable(onelake_uri, storage_options={"bearer_token": f"{token_val}", "use_fabric_endpoint": "true"})
    df = dt.to_pandas()
    print(df.head())
    
  7. Запишите таблицу Delta в OneLake.

    target_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}.lakehouse/Tables/<target_table_name>"
    write_deltalake(
        target_uri,
        df,
        mode="overwrite",
        storage_options={"bearer_token": f"{token_val}", "use_fabric_endpoint": "true"}
    )
    

Рекомендации по проектированию

  • Используйте один паттерн записи для каждого пути таблицы, где это возможно. Запись в один и тот же путь к хранилищу из нескольких вычислительных подсистем или версий среды выполнения может привести к конфликтам.
  • Используйте управление секретами для учетных данных сервисного принципала.
  • Используйте ярлыки OneLake, если требуется виртуализированный доступ вместо физической записи данных в другое расположение lakehouse.