Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Important
Synapse Link для Cosmos DB больше не поддерживается для новых проектов. Не используйте эту функцию.
Используйте зеркальное отображение Azure Cosmos DB для Microsoft Fabric, которое теперь находится в стадии общей доступности. Зеркальное отображение обеспечивает те же преимущества нулевого ETL и полностью интегрировано с Microsoft Fabric. Дополнительные сведения см. в Обзоре зеркалирования Cosmos DB.
Из этой статьи вы узнаете, как взаимодействовать с Azure Cosmos DB с помощью Synapse Apache Spark 3. Клиенты могут использовать Scala, Python, SparkSQL и C#, для аналитики, проектирования данных, обработки и анализа данных в Azure Synapse Link для Azure Cosmos DB.
При взаимодействии с Azure Cosmos DB поддерживаются следующие возможности.
- Synapse Apache Spark 3 позволяет анализировать данные в контейнерах Azure Cosmos DB, которые включены в Azure Synapse Link практически в реальном времени, не влияя на производительность транзакционных рабочих нагрузок. Следующие два варианта доступны для запроса аналитического хранилища Azure Cosmos DB из Spark:
- Загрузка в Spark DataFrame.
- Создать таблицу Spark
- Synapse Apache Spark также позволяет принимать данные в Azure Cosmos DB. Важно отметить, что данные всегда передаются в контейнеры Azure Cosmos DB через хранилище транзакций. Если Azure Synapse Link включен, все новые вставки, обновления и удаления будут автоматически синхронизированы с аналитическим хранилищем.
- Synapse Apache Spark также поддерживает структурированную потоковую передачу Spark с помощью Azure Cosmos DB в качестве источника и приёмника.
В следующих разделах описан синтаксис. Вы также можете ознакомиться с модулем Learn о том, как запрашивать Azure Cosmos DB с помощью Apache Spark для Azure Synapse Analytics. Жесты в рабочей области Azure Synapse Analytics разработаны с целью предоставления простого готового интерфейса для начала работы. Жесты доступны, когда вы щелкаете правой кнопкой мыши контейнер Azure Cosmos DB на вкладке Данные в рабочей области Synapse. С помощью жестов можно быстро создать код и скорректировать его в соответствии с потребностями. Жесты также идеально подходят для обнаружения данных одним щелчком мыши.
Important
Вы должны учитывать некоторые ограничения в аналитической схеме, которые могут привести к непредвиденному поведению в операциях загрузки данных. Например, в аналитической схеме доступны только первые 1000 свойств из схемы транзакций, свойства с пробелами недоступны и т. д. Если у вас возникли непредвиденные результаты, проверьте ограничения схемы аналитического хранилища данных для получения более подробной информации.
Запрос аналитического хранилища Azure Cosmos DB
Клиенты могут загружать данные аналитического хранилища в DataFrames Spark или создавать таблицы Spark.
Различия заключаются в том, должны ли изменения базовых данных в контейнере Azure Cosmos DB автоматически отражаться в анализе, выполненном в Spark. Когда регистрируются кадры данных Spark или создается таблица Spark, Spark извлекает метаданные аналитического хранилища для эффективной принудительной отправки. Важно отметить, что Spark следует политике отложенной оценки. Необходимо принять меры для получения последнего моментального снимка данных в запросах Spark DataFrames или SparkSQL.
В случае загрузки в кадр данных Spark извлеченные метаданные кэшируются на протяжении всего времени существования сеанса Spark. Поэтому последующие действия, вызываемые в кадре данных, оцениваются на основе состояния аналитического хранилища на момент создания кадра данных.
С другой стороны, в случае создания таблицы Spark метаданные состояния аналитического хранилища не кэшируются в Spark и перезагружается при каждом выполнении запросов SparkSQL в таблице Spark.
В заключение, вы можете выбрать между загрузкой моментального снимка в Spark DataFrame или выполнением запроса к таблице Spark для получения последнего моментального снимка.
Note
Чтобы запросить учетные записи Azure Cosmos DB для MongoDB, узнайте больше о полном представлении схемы точности в аналитическом хранилище и именах расширенных свойств, используемых.
Note
Все options чувствительны к регистру.
Authentication
Теперь клиенты Spark 3.x могут проходить проверку подлинности в аналитическом хранилище Azure Cosmos DB с помощью маркеров доступа доверенных удостоверений или ключей учетной записи базы данных. Токены более безопасны, так как они недолго живущие и назначаются для необходимых разрешений с помощью RBAC Cosmos DB.
Соединитель теперь поддерживает два типа проверки подлинности: MasterKey и AccessToken, для свойства spark.cosmos.auth.type.
Проверка подлинности главного ключа
Используйте ключ для чтения кадра данных с помощью Spark:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.accountKey" -> "<key>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Проверка подлинности токена доступа
Новая безключевая аутентификация поддерживает маркеры доступа.
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.auth.type" -> "AccessToken",
"spark.cosmos.auth.accessToken" -> "<accessToken>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Note
Коннектор Azure Synapse Link Spark для Azure Cosmos DB не поддерживает управляемое удостоверение.
Проверка подлинности маркера доступа требует назначения ролей
Чтобы использовать подход маркера доступа, необходимо создать маркеры доступа. Поскольку токены доступа связаны с удостоверениями Azure, необходимо назначить правильные роли RBAC этой учетной записи. Назначение роли находится на уровне плоскости данных, и для выполнения назначения роли необходимо иметь минимальные разрешения уровня управления.
Назначения ролей в управлении доступом к идентификации (IAM) в портале Azure находятся на уровне контрольной плоскости и не влияют на назначения ролей на уровне плоскости данных. Назначения ролей плоскости данных доступны только через Azure CLI. Действие readAnalytics требуется для чтения данных из аналитического хранилища в Cosmos DB и не является частью предопределенных ролей. Таким образом необходимо создать определение пользовательской роли.
readAnalytics Помимо данного действия, также добавьте действия, необходимые для Data Reader. Создайте JSON-файл со следующим содержимым и назовите его role_definition.json
{
"RoleName": "CosmosAnalyticsRole",
"Type": "CustomRole",
"AssignableScopes": ["/"],
"Permissions": [{
"DataActions": [
"Microsoft.DocumentDB/databaseAccounts/readAnalytics",
"Microsoft.DocumentDB/databaseAccounts/readMetadata",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
]
}]
}
Для проверки подлинности маркера доступа требуется Azure CLI
- Войдите в Azure CLI:
az login - Задайте подписку по умолчанию с вашей учетной записью Cosmos DB:
az account set --subscription <name or id> - Создайте определение роли в нужной учетной записи Cosmos DB:
az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json - Скопируйте возвращенную роль
definition id:/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid> - Получите основной идентификатор субъекта, которому вы хотите назначить роль. Удостоверение может быть регистрацией приложения в Azure, виртуальной машиной или любой другой поддерживаемый ресурс Azure. Назначьте роль принципалу с помощью:
az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"
Note
При использовании регистрации приложения Azure используйте Object Id в качестве идентификатора главного объекта. Кроме того, идентификатор участника и учетная запись Cosmos DB должны находиться в одном тенанте.
Генерация токена доступа — Synapse Notebooks
Рекомендуемый метод для Synapse Notebooks — использовать служебный объект с сертификатом для создания токенов доступа. Щелкните здесь для получения дополнительных сведений.
The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
"https://<cosmos-account-name>.documents.azure.com/.default",
"https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)
Теперь можно использовать маркер доступа, созданный на этом шаге, для чтения данных из аналитического хранилища, когда для типа проверки подлинности задан маркер доступа.
Note
При использовании регистрации приложения Azure используйте приложение (идентификатор клиента).
Note
В настоящее время Synapse не поддерживает создание маркеров доступа с помощью пакета azure-identity в записных книжках. Кроме того, виртуальные жесткие диски Synapse не включают пакет идентификации Azure и его зависимости. Щелкните здесь для получения дополнительных сведений.
Загрузка в Spark DataFrame.
В этом примере создается Spark DataFrame, указывающий на аналитическое хранилище Azure Cosmos DB. Затем можно выполнить более глубокий анализ, вызвав действия Spark на DataFrame. Эта операция не влияет на хранилище транзакций.
Синтаксис на языке Python будет следующим:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
Эквивалентный синтаксис на языке Scala будет следующим:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Создать таблицу Spark
В этом примере создается таблица Spark, указывающая аналитическое хранилище Azure Cosmos DB. Затем можно выполнить дополнительный анализ, вызвав запросы SparkSQL к таблице. Эта операция не влияет на хранилище транзакций или не приводит к перемещению данных. Если вы решите удалить эту таблицу Spark, базовый контейнер Azure Cosmos DB и соответствующее аналитическое хранилище не будут затронуты.
Этот сценарий удобен для повторного использования таблиц Spark с применением средств сторонних разработчиков и предоставления доступа к базовым данным в среде выполнения.
Синтаксис для создания таблицы Spark выглядит следующим образом:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Note
Если у вас есть сценарии, в которых схема базового контейнера Azure Cosmos DB изменяется с течением времени, и если вы хотите, чтобы обновленная схема автоматически отражалась в запросах к таблице Spark, в параметрах таблицы Spark установите для параметра spark.cosmos.autoSchemaMerge значение true.
Запись DataFrame Spark в контейнер Azure Cosmos DB
В этом примере вы сохраняете DataFrame Spark в контейнер Azure Cosmos DB. Эта операция влияет на производительность транзакционных рабочих нагрузок и потребляет единицы запросов, подготовленных в контейнере Azure Cosmos DB или в общей базе данных.
Синтаксис на языке Python будет следующим:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.mode('append')\
.save()
Эквивалентный синтаксис на языке Scala будет следующим:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
mode(SaveMode.Append).
save()
Загрузка потокового DataFrame из контейнера
В этом жесте вы используете функцию потоковой передачи Spark для загрузки данных из контейнера в кадр данных. Данные хранятся в основной учетной записи озера данных (и файловой системе), подключенной к рабочей области.
Note
Если вы хотите ссылаться на внешние библиотеки в Synapse Apache Spark, ознакомьтесь с дополнительными сведениями здесь.
Загрузка кадра данных для потоковой передачи из контейнера Azure Cosmos DB
В этом примере вы используете структурированную потоковую передачу Spark для загрузки данных из контейнера Azure Cosmos DB в потоковый DataFrame Spark, используя функциональность канала изменений в Azure Cosmos DB. Данные контрольных точек, используемые Spark'ом, будут храниться в основной учетной записи для хранилища данных (и файловой системе), подключенной к рабочей области.
Синтаксис на языке Python будет следующим:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp.changeFeed")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.startFrom", "Beginning")\
.option("spark.cosmos.changeFeed.mode", "Incremental")\
.load()
Эквивалентный синтаксис на языке Scala будет следующим:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp.changeFeed").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.startFrom", "Beginning").
option("spark.cosmos.changeFeed.mode", "Incremental").
load()
Запись кадра данных для потоковой передачи в контейнер Azure Cosmos DB
В этом примере вы записываете стриминговый DataFrame в контейнер Azure Cosmos DB. Эта операция влияет на производительность транзакционных рабочих нагрузок и использование единиц запросов, выделенных в контейнере Azure Cosmos DB или в общей базе данных. Если папка /localWriteCheckpointFolder не создана (в примере ниже), она создается автоматически.
Синтаксис на языке Python будет следующим:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
streamQuery = dfStream\
.writeStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("checkpointLocation", "/tmp/myRunId/")\
.outputMode("append")\
.start()
streamQuery.awaitTermination()
Эквивалентный синтаксис на языке Scala будет следующим:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val query = dfStream.
writeStream.
format("cosmos.oltp").
outputMode("append").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("checkpointLocation", "/tmp/myRunId/").
start()
query.awaitTermination()
Дальнейшие шаги
- Примеры для начала работы с Azure Synapse Link на сайте GitHub.
- Узнайте, какие функции поддерживаются в Azure Synapse Link для Azure Cosmos DB
- Подключение к Azure Synapse Link для Azure Cosmos DB
- Ознакомьтесь с модулем Learn по запросу Azure Cosmos DB с помощью Apache Spark для Azure Synapse Analytics.