Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
С помощью Azure Synapse Link для Azure Cosmos DB пользователи могут запускать аналитику почти в реальном времени по операционным данным в Azure Cosmos DB. Однако в некоторых ситуациях определенные данные необходимо агрегировать и обогащать для обслуживания пользователей хранилища данных. Куратировать и экспортировать данные Azure Synapse Link можно всего несколькими ячейками в записной книжке.
Предварительные условия
- Создайте рабочее пространство Synapse с помощью:
- Подготовка учетной записи Azure Cosmos DB с контейнером HTAP с данными
- Подключите контейнер HTAP Azure Cosmos DB к рабочей области
- Настройте надлежащим образом импорт данных в выделенный пул SQL из Spark.
Шаги
В этом руководстве показано, как подключиться к аналитическому хранилищу. Описанные здесь действия не влияют на хранилище транзакций (для их выполнения единицы запросов не потребляются). Мы выполним следующие действия:
- Прочитайте контейнер HTAP Azure Cosmos DB в датафрейм Spark
- Объедините результаты в новый датафрейм
- Прием данных в выделенный пул SQL
Данные
В этом примере мы используем контейнер HTAP с именем RetailSales. Это часть связанной службы с именем ConnectedData со следующей схемой:
- _rid: string (nullable = true)
- _ts: long (допустимо значение null = true)
- logQuantity: двойной (может быть пустым = да)
- productCode: string (nullable = true)
- количество: длинное (nullable = true)
- цена: long (nullable = true)
- id: string (nullable = true)
- реклама: длинный (nullable = true)
- идентификатор_магазина: long (допускается значение null)
- началоНедели: Длинный (nullable = true)
- _etag: string (nullable = true)
Для отчетности мы будем агрегировать продажи (количество, выручка (цена x количество) по коду продукта и началу недели. Наконец, мы экспортируем эти данные в выделенную таблицу пула SQL dbo.productsales
.
Настройка ноутбука Spark
Создайте записную книжку Spark, выбрав Scala в качестве основного языка. Мы используем для сеанса заданный по умолчанию параметр записной книжки.
Прочтите данные в Spark
Чтение контейнера HTAP Azure Cosmos DB с помощью Spark в кадр данных в первой ячейке.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
Агрегирование результатов в новый фрейм данных
Во второй ячейке будут выполнятся преобразование и статистические вычисления, которые необходимо выполнить для нового кадра данных до его загрузки в базу данных выделенного пула SQL.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
Загрузка результатов в выделенный пул SQL
В третьей ячейке данные загружаются в выделенный пул SQL. При этом автоматически создается временная внешняя таблица, внешний источник данных и формат внешнего файла, которые будут удалены после завершения задания.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
Запрос результатов с помощью SQL
Вы можете запросить результат, используя простой SQL-запрос, например так, как в следующем скрипте SQL:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
Запрос отобразит следующие результаты в режиме диаграммы: