Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве описана обработка событий в учетной записи хранения с иерархическим пространством имен.
Вы создадите небольшое решение, которое позволяет пользователю заполнить таблицу Databricks Delta, загружая файл разделенных запятыми значений (CSV) с описанием заказа на продажу. Чтобы создать это решение, вы объедините подписку на Сетку событий, функцию Azure и задание в Azure Databricks.
При работе с этим руководством вы сделаете следующее:
- Создайте подписку на Event Grid, которая вызывает функцию Azure.
- создавать функцию Azure, которая получает от события уведомление и запускает задание в Azure Databricks;
- Создайте задание Databricks, которое вставляет заказ клиента в таблицу Delta Databricks, находящуюся в учетной записи хранения.
Мы будем создавать решение в обратном порядке, то есть начнем с рабочей области Azure Databricks.
Предварительные условия
Создайте учетную запись хранения с иерархическим пространством имен (Azure Data Lake Storage). В рамках этого руководства используется учетная запись хранения с именем
contosoorders
.См. статью "Создание учетной записи хранения для использования с Azure Data Lake Storage".
Убедитесь, что вашей учетной записи пользователя назначена роль Storage Blob Data Contributor.
Создайте служебный принципал, создайте клиентский секрет и предоставьте служебному принципалу доступ к учетной записи хранения.
См . руководство по подключению к Azure Data Lake Storage (шаги 1–3). После выполнения этих действий обязательно вставьте идентификатор клиента, идентификатор приложения и значения секрета клиента в текстовый файл. Они вам скоро понадобятся.
Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
Создать заказ на продажу
Сначала создайте CSV-файл с описанием заказа на продажу и отправьте его в учетную запись хранения. Позже вы примените данные из этого файла для заполнения первой строки в таблице Databricks Delta.
Войдите в новую учетную запись хранения на портале Azure.
Выберите Обозреватель хранилища->Контейнеры BLOB-объектов->Добавить контейнер и создайте новый контейнер с именем data.
В контейнере данных создайте каталог с именем входных данных.
Вставьте приведенный ниже текст в текстовый редактор.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Сохраните этот файл на локальном компьютере и присвойте ему имя data.csv.
В браузере хранилища отправьте этот файл в входную папку.
Создание задания в Azure Databricks
В этом разделе описано выполнение таких задач:
- Создайте рабочую область Azure Databricks.
- Создайте записную книжку.
- создание и заполнение таблицы Databricks Delta;
- добавление кода для вставки строк в таблицу Databricks Delta;
- Создайте работу.
Создайте рабочую область Azure Databricks.
В этом разделе вы создадите рабочую область Azure Databricks с помощью портала Azure.
Создайте рабочую область Azure Databricks. Присвойте имя рабочей области
contoso-orders
. См. статью "Создание рабочей области Azure Databricks".Создание кластера. Присвойте кластеру
customer-order-cluster
имя. См. Создание кластера.Создайте записную книжку. Назовите записную книжку
configure-customer-table
и выберите Python в качестве языка записной книжки по умолчанию. См. статью "Создание записной книжки".
Создание и заполнение таблицы Databricks Delta
Скопируйте приведенный ниже блок кода и вставьте его в первую ячейку в новой записной книжке, но пока не выполняйте этот код.
В этом блоке кода замените значения заполнителей
appId
,password
иtenant
значениями, которые вы собрали при подготовке предварительных условий для этого руководства.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://[email protected]/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
Этот код позволяет создать мини-приложение с именем source_file. Позже вы создадите функцию Azure, которая вызывает этот код и передает мини-приложению путь к файлу. Этот код также аутентифицирует вашего служебного принципала в учетной записи хранения и создает некоторые переменные, которые будут использоваться в других ячейках.
Примечание.
При настройке рабочей среды рассмотрите возможность сохранения ключа проверки подлинности в Azure Databricks. Затем в блоке кода замените ключ проверки подлинности ключом поиска.
Например, вместо использования строки кодаspark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
следует использовать строкуspark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
После завершения работы с этим руководством ознакомьтесь со статьей Azure Data Lake Storage на веб-сайте Azure Databricks, чтобы просмотреть примеры этого подхода.Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.
Скопируйте приведенный ниже блок кода и вставьте его в другую ячейку записной книжки. Затем нажмите сочетание клавиш SHIFT+ВВОД, чтобы выполнить этот блок кода.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
Этот код позволяет создать в учетной записи хранения таблицу Databricks Delta, а затем загрузить в нее начальные данные из ранее отправленного CSV-файла.
После успешного выполнения этого блока кода удалите его из записной книжки.
Добавление кода для вставки строк в таблицу Databricks Delta
Скопируйте приведенный ниже блок кода и вставьте его в другую ячейку, но пока не запускайте ее.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Этот код позволяет вставлять во временное табличное представление данные из CSV-файла. Путь к этому CSV-файлу получен из входных данных мини-приложения, которое вы создали на предыдущем шаге.
Скопируйте и вставьте следующий блок кода в другую ячейку. Этот код объединяет содержимое временной таблицы с таблицей Databricks Delta.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
Создание задания
Создайте задание для запуска созданной ранее записной книжки. Позже вы создадите функцию Azure, которая запускает это задание при возникновении события.
Выберите Новый->задание.
Присвойте заданию имя, выберите созданную записную книжку и кластер. Затем нажмите кнопку "Создать ", чтобы создать задание.
Создание функции Azure
Создайте функцию Azure, которая запускает это задание.
В рабочей области Azure Databricks щелкните имя пользователя Azure Databricks в верхней строке, а затем в раскрывающемся списке выберите "Параметры пользователя".
На вкладке "Маркеры доступа" выберите "Создать новый маркер".
Скопируйте отображаемый маркер и нажмите кнопку "Готово".
В верхнем углу рабочей области Databricks щелкните иконку "люди", а затем выберите Параметры пользователя.
Нажмите кнопку "Создать новый маркер " и нажмите кнопку "Создать ".
Не забудьте скопировать токен в надежное место. Для выполнения задания функция Azure нуждается в этом токене для аутентификации в Databricks.
На домашней странице или в меню портала Azure выберите Создать ресурс.
На странице Создать выберите Вычисления>Функциональное приложение.
На вкладке "Основы" на странице "Создание приложения-функции" выберите группу ресурсов, а затем измените или проверьте следующие параметры:
Настройки Значение Имя функции приложения contosoorder Стек среды выполнения .NET Публикация Код Операционная система Windows Тип плана Потребление (Бессерверное) Выберите Проверить и создать, а затем выберите Создать.
По завершении развертывания выберите "Перейти к ресурсу ", чтобы открыть страницу обзора приложения-функции.
В группе Параметры выберите Конфигурация.
На странице Параметры приложения нажмите кнопку Новый параметр приложения поочередно для каждого параметра.
Добавьте следующие параметры:
Имя настройки Значение DBX_INSTANCE Регион вашего пространства Databricks. Например: westus2.azuredatabricks.net
DBX_PAT Личный маркер доступа, который вы создали ранее. DBX_JOB_ID Идентификатор выполняемого задания. Нажмите кнопку "Сохранить", чтобы зафиксировать эти параметры.
В группе "Функции" выберите "Функции" и нажмите кнопку "Создать".
Щелкните Azure Event Grid Trigger (Триггер Сетки событий Azure).
Установите расширение Microsoft.Azure.WebJobs.Extensions.EventGrid, если вас попросят об этом. Если потребуется установить расширение, повторно щелкните Azure Event Grid Trigger (Триггер Сетки событий Azure), чтобы создать функцию.
Появится область Новая функция.
В области "Создать функцию" назовите функцию UpsertOrder и нажмите кнопку "Создать".
Замените содержимое файла кода этим кодом и нажмите кнопку "Сохранить ":
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Этот код позволяет проанализировать сведения о возникшем событии хранилища и создать сообщение запроса с URL-адресом файла, вызвавшего событие. В составе этого сообщения функция передает значение в мини-приложение source_file, которое вы создали ранее. Код функции отправляет сообщение в задание Databricks и использует маркер, полученный ранее, для аутентификации.
Создать подписку на Event Grid
В этом разделе вы создадите подписку на службу "Сетка событий", которая вызывает функцию Azure при отправке файлов в учетную запись хранения.
Выберите "Интеграция", а затем на странице "Интеграция" выберите "Триггер сетки событий".
В области "Изменить триггер" назовите событие, а затем выберите
eventGridEvent
на события".Примечание.
Имя
eventGridEvent
соответствует параметру, который передается в функцию Azure.На вкладке "Основы" на странице "Создание подписки на события" измените или проверьте следующие параметры:
Настройка Значение Имя. contoso-подписка-на-события-заказа Тип темы учетная запись хранилища Исходный ресурс contosoorders Имя раздела системы <create any name>
Фильтр по типам событий Создание BLOB-объектов и удаление BLOB-объектов Выберите кнопку Создать.
Тестирование подписки на Сетку событий
Создайте файл с именем
customer-order.csv
, вставьте в него приведенный ниже код JSON и сохраните файл на локальном компьютере.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
С помощью Обозревателя службы хранилища отправьте этот файл в папку input в учетной записи хранения.
При отправке файла вызывается событие Microsoft.Storage.BlobCreated. Сетка событий уведомляет всех, кто подписался на это событие. В нашем примере единственным подписчиком является наша функция Azure. Эта функция Azure анализирует параметры события и определяет, какое событие произошло. Затем она передает URL-адрес файла в задание Databricks. В рамках задания Databricks файл считывается и в таблицу Databricks Delta, размещенную в учетной записи хранения, добавляется соответствующая строка.
Чтобы проверить, выполнено ли задание успешно, просмотрите результаты выполнения. Вы увидите состояние завершения. Для получения дополнительной информации о том, как просматривать запуски задания, см. Просмотр запусков для задания
В новой ячейке книги выполните этот запрос, чтобы просмотреть обновленную дельта-таблицу.
%sql select * from customer_data
Возвращается таблица, которая содержит последнюю запись.
Чтобы обновить эту запись, создайте файл с именем
customer-order-update.csv
, вставьте в него приведенный ниже код и сохраните файл на локальном компьютере.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Этот CSV-файл почти идентичен предыдущему, за исключением того, что количество в заказе изменилось с
228
на22
.С помощью Обозревателя службы хранилища отправьте этот файл в папку input в учетной записи хранения.
Снова выполните запрос
select
, чтобы просмотреть обновленную разностную таблицу.%sql select * from customer_data
Будет возвращена таблица, которая содержит обновленную запись.
Очистка ресурсов
Удалите группу ресурсов и все связанные с ней ресурсы, когда надобность в них отпадет. Для этого выберите группу ресурсов для учетной записи хранения и выберите Удалить.
Следующие шаги
Reacting to Blob storage events (preview) (Реагирование на события хранилища BLOB-объектов)