Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
В этом руководстве вы используете соединитель Spark Azure Cosmos DB для чтения или записи данных из учетной записи Azure Cosmos DB для NoSQL. В этом руководстве используется Azure Databricks и записная книжка Jupyter для иллюстрации интеграции с API для NoSQL из Spark. В этом руководстве основное внимание уделяется Python и Scala, хотя вы можете использовать любой язык или интерфейс, поддерживаемый Spark.
В этом руководстве описано следующее:
- Подключитесь к учетной записи API для NoSQL с помощью Spark и записной книжки Jupyter.
- Создание ресурсов базы данных и контейнера.
- Загрузка данных в контейнер.
- Запрос данных в контейнере.
- Выполнение общих операций с элементами в контейнере.
Prerequisites
- Существующая учетная запись Azure Cosmos DB для NoSQL.
- Если у вас есть подписка Azure, создайте новую учетную запись.
- Существующая рабочая область Azure Databricks.
Подключение с помощью Spark и Jupyter
Используйте существующую рабочую область Azure Databricks для создания вычислительного кластера, готового к использованию Apache Spark 3.4.x для подключения к учетной записи Azure Cosmos DB для NoSQL.
Откройте рабочую область Azure Databricks.
В интерфейсе рабочей области создайте новый кластер. Настройте кластер с этими параметрами как минимум:
Version Value Версия среды выполнения 13.3 LTS (Scala 2.12, Spark 3.4.1) Используйте интерфейс рабочей области для поиска пакетов Maven из Maven Central с идентификатором группы
com.azure.cosmos.spark. Установите пакет специально для Spark 3.4 с идентификатором артефакта , префиксированнымazure-cosmos-spark_3-4, в кластер.Наконец, создайте новую записную книжку.
Tip
По умолчанию записная книжка подключена к недавно созданному кластеру.
В записной книжке задайте параметры конфигурации оперативной обработки транзакций (OLTP) для конечной точки учетной записи NoSQL, имени базы данных и имени контейнера.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Создание базы данных и контейнера
Используйте API каталога для управления ресурсами учетной записи, такими как базы данных и контейнеры. Затем можно использовать OLTP для управления данными в ресурсах контейнера.
Настройте API каталога для управления ресурсами API noSQL с помощью Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Создайте новую базу данных с именем
cosmicworksс помощьюCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Создайте новый контейнер с именем
productsс помощьюCREATE TABLE IF NOT EXISTS. Убедитесь, что путь/categoryключа секции задан и включена пропускная способность автомасштабирования с максимальной пропускной способностью1000единиц запросов (ЕЗ) в секунду.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Создайте другой контейнер с именем
employeesс помощью конфигурации ключа иерархической секции. Используйте/organization,/departmentи/teamв качестве набора путей ключа раздела. Следуйте этому конкретному заказу. Кроме того, задайте пропускную способность вручную, установив значение на400RU.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Запустите ячейки записной книжки, чтобы убедиться, что база данных и контейнеры созданы в вашей учетной записи API NoSQL.
Прием данных
Создание примера набора данных. Затем используйте OLTP для приема данных в контейнер API для NoSQL.
Создание примера набора данных.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Используйте
spark.createDataFrameи ранее сохраненную конфигурацию OLTP для добавления примеров данных в целевой контейнер.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Запрос данных
Загрузите данные OLTP в фрейм данных, чтобы выполнять типичные запросы к данным. Для фильтрации или запроса данных можно использовать различные синтаксисы.
Используйте
spark.readдля загрузки данных OLTP в объект DataFrame. Используйте ту же конфигурацию, которую вы использовали ранее в этом руководстве. Кроме того, установите значениеspark.cosmos.read.inferSchema.enabled, чтобыtrueразрешил соединителю Spark выводить схему, выбирая существующие элементы.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Отобразите схему данных, загруженных в фрейм данных, с помощью
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Отображение строк данных, где столбец
quantityменьше20. Используйте функцииwhereиshowдля выполнения этого запроса.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Отобразите первую строку данных, в которой столбец
clearanceравенtrue. Используйте функциюfilterдля выполнения этого запроса.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Отрисовка пяти строк данных без фильтрации или усечения. Используйте функцию
showдля настройки внешнего вида и количества отображаемых строк.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Запросите данные с помощью этой необработанной строки запроса NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Выполнение распространенных операций
При работе с данными API для NoSQL в Spark можно выполнять частичные обновления или работать с данными как необработанные JSON.
Чтобы выполнить частичное обновление элемента:
Скопируйте существующую
configпеременную конфигурации и измените свойства в новой копии. В частности, настройте стратегию записи наItemPatch. Затем отключите массовую поддержку. Задайте столбцы и сопоставленные операции. Наконец, задайте для типа операции по умолчанию значениеSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Создайте переменные для ключа раздела элемента и уникального идентификатора, которые вы намереваетесь использовать как часть данной операции исправления.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Создайте набор объектов исправлений, чтобы указать целевой элемент и указать поля, которые необходимо изменить.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Создайте кадр данных с помощью набора объектов исправлений. Используется
writeдля выполнения операции исправления.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Запустите запрос, чтобы просмотреть результаты операции исправления. Теперь элемент должен быть назван
Yamba New Surfboardбез других изменений.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Для работы с необработанными данными JSON:
Скопируйте существующую
configпеременную конфигурации и измените свойства в новой копии. В частности, измените целевой контейнер наemployees. Затем настройтеcontactsстолбец или поле для использования необработанных данных JSON.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Создайте группу сотрудников для загрузки в контейнер.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "[email protected]" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "[email protected]" } ]""") )Создайте таблицу данных и используйте
writeдля загрузки данных сотрудника.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Отрисовка данных из кадра данных с помощью
show. Обратите внимание, чтоcontactsстолбец является необработанным JSON в выходных данных.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Связанный контент
- Apache Spark
- API каталога Azure Cosmos DB
- Справочник по параметру конфигурации
- Примеры соединителя Spark для Azure Cosmos DB
- Миграция из Spark 2.4 в Spark 3.*
- Устаревшие версии:
- Соединитель Azure Cosmos DB для Spark 3.1 и 3.2 устарел, так как в Azure Databricks, Azure Synapse или Azure HDInsight больше нет поддерживаемых сред выполнения Spark 3.1 или 3.2.
- Руководство по миграции для обновления из Spark 3.1
- Руководство по миграции для обновления из Spark 3.2
- Совместимость версий:
- Заметки о выпуске:
- Ссылки на скачивание: