Получение событий Сетки событий Azure в конечную точку HTTP

В этой статье описывается, как настроить конечную точку HTTP для получения событий из сетки событий Azure. Вы узнаете, как проверить конечную точку, получать события и десериализировать их с помощью C# или JavaScript. В статье используется функция Azure для демонстрационных целей, но те же понятия применяются к любому размещенному приложению.

В статье рассматриваются следующие задачи:

  • Проверьте конечную точку HTTP, чтобы принимать события из подписки на события.
  • Обработка событий проверки подписки
  • Обработка событий Azure Blob Storage
  • Обработка пользовательских событий

Примечание.

Рекомендуется использовать триггер сетки событий для активации функции Azure с сеткой событий. Это обеспечивает более простую и быструю интеграцию между Сеткой событий и Функциями Azure. Однако триггер Сетки событий Функций Azure не поддерживает сценарий, в котором размещенный код должен контролировать HTTP-код состояния, возвращаемый в Сетку событий. Учитывая это ограничение, код, выполняющийся в функции Azure, не может вернуть ошибку 5XX, чтобы инициировать повторную попытку доставки событий в Сетке событий, например.

Требования

  • Вам потребуется приложение функций с функцией, которая активируется HTTP-запросом.

Добавление зависимостей

При разработке в .NET добавьте зависимость в функцию для Azure.Messaging.EventGridNuGet пакета.

Наборы для разработки программного обеспечения (SDK) (издательские SDK для других языков доступны через справочник по издательским SDK). Эти пакеты содержат модели для собственных типов событий, таких как EventGridEvent, StorageBlobCreatedEventData и EventHubCaptureFileCreatedEventData.

Проверка конечной точки

Во-первых, обработайте Microsoft.EventGrid.SubscriptionValidationEvent события. Каждый раз, когда пользователь подписывается на событие, Event Grid отправляет событие проверки в конечную точку с validationCode в полезных данных. Конечная точка должна повторить этот код в тексте ответа, чтобы доказать, что конечная точка действительна и принадлежит вам. Если вы используете триггер Event Grid вместо функции, активируемой веб-перехватчиком, триггер сам обрабатывает проверку конечной точки для вас. При использовании службы API сторонних производителей (например, Zapier или IFTTT), возможно, не удастся вывести на экран код проверки программными средствами. Для этих служб можно вручную проверить подписку с помощью URL-адреса проверки, который отправляется в событии проверки подписки. Скопируйте этот URL-адрес в свойство validationUrl и отправьте запрос GET через клиент REST или веб-браузер.

В C#используйте ParseMany() метод для десериализации BinaryData экземпляра, содержащего одно или несколько событий в массив EventGridEvent. Если вы знаете заранее, что десериализуется только одно событие, можно использовать Parse метод.

Чтобы программно вывести валидационный код, используйте следующий код:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response
                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };

                        return new OkObjectResult(responseData);
                    }
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }
    }
    context.done();
};

Проверка тестового ответа

Проверьте функцию обработки ответов, вставив образец события в тестовое поле функции.

[{
  "id": "2d1781af-3a4c-4d7c-bd0c-e34b19da4e66",
  "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "subject": "",
  "data": {
    "validationCode": "512d38b6-c7b8-40c8-89fe-f46f9e9622b6"
  },
  "eventType": "Microsoft.EventGrid.SubscriptionValidationEvent",
  "eventTime": "2018-01-25T22:12:19.4556811Z",
  "metadataVersion": "1",
  "dataVersion": "1"
}]

При выборе "Выполнить", в выводе отображаются 200 OK и {"validationResponse":"512d38b6-c7b8-40c8-89fe-f46f9e9622b6"} в основном тексте:

Снимок экрана: запрос на проверку сетки событий JSON в поле теста функции Azure.

Снимок экрана вывода проверки подписки в Event Grid с ответом 200 OK.

Обработка событий хранилища BLOB-объектов

Теперь расширьте функцию для обработки системного Microsoft.Storage.BlobCreated события:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response

                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };
                        return new OkObjectResult(responseData);
                    }
                    // Handle the storage blob created event
                    else if (eventData is StorageBlobCreatedEventData storageBlobCreatedEventData)
                    {
                        log.LogInformation($"Got BlobCreated event data, blob URI {storageBlobCreatedEventData.Url}");
                    }
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";
    var storageBlobCreatedEvent = "Microsoft.Storage.BlobCreated";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type  
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }

        else if (body.data && body.eventType == storageBlobCreatedEvent) {
            var blobCreatedEventData = body.data;
            context.log("Relaying received blob created event payload:" + JSON.stringify(blobCreatedEventData));
        }
    }
    context.done();
};

Обработка события создания тестового Блоба

Проверьте новую функциональность функции, поместив событие Blob-хранилища в поле тестирования и запустив:

[{
  "topic": "/subscriptions/{subscription-id}/resourceGroups/Storage/providers/Microsoft.Storage/storageAccounts/xstoretestaccount",
  "subject": "/blobServices/default/containers/testcontainer/blobs/testfile.txt",
  "eventType": "Microsoft.Storage.BlobCreated",
  "eventTime": "2017-06-26T18:41:00.9584103Z",
  "id": "aaaa0a0a-bb1b-cc2c-dd3d-eeeeee4e4e4e",
  "data": {
    "api": "PutBlockList",
    "clientRequestId": "bbbb1b1b-cc2c-dd3d-ee4e-ffffff5f5f5f",
    "requestId": "cccc2c2c-dd3d-ee4e-ff5f-aaaaaa6a6a6a",
    "eTag": "0x8D4BCC2E4835CD0",
    "contentType": "text/plain",
    "contentLength": 524288,
    "blobType": "BlockBlob",
    "url": "https://example.blob.core.windows.net/testcontainer/testfile.txt",
    "sequencer": "00000000000004420000000000028963",
    "storageDiagnostics": {
      "batchId": "dddd3d3d-ee4e-ff5f-aa6a-bbbbbb7b7b7b"
    }
  },
  "dataVersion": "",
  "metadataVersion": "1"
}]

Вы видите результат URL-адреса BLOB в журнале функции:

2022-11-14T22:40:45.978 [Information] Executing 'Function1' (Reason='This function was programmatically called via the host APIs.', Id=8429137d-9245-438c-8206-f9e85ef5dd61)
2022-11-14T22:40:46.012 [Information] C# HTTP trigger function processed a request.
2022-11-14T22:40:46.017 [Information] Received events: [{"topic": "/subscriptions/{subscription-id}/resourceGroups/Storage/providers/Microsoft.Storage/storageAccounts/xstoretestaccount","subject": "/blobServices/default/containers/testcontainer/blobs/testfile.txt","eventType": "Microsoft.Storage.BlobCreated","eventTime": "2017-06-26T18:41:00.9584103Z","id": "aaaa0a0a-bb1b-cc2c-dd3d-eeeeee4e4e4e","data": {"api": "PutBlockList","clientRequestId": "bbbb1b1b-cc2c-dd3d-ee4e-ffffff5f5f5f","requestId": "cccc2c2c-dd3d-ee4e-ff5f-aaaaaa6a6a6a","eTag": "0x8D4BCC2E4835CD0","contentType": "text/plain","contentLength": 524288,"blobType": "BlockBlob","url": "https://example.blob.core.windows.net/testcontainer/testfile.txt","sequencer": "00000000000004420000000000028963","storageDiagnostics": {"batchId": "dddd3d3d-ee4e-ff5f-aa6a-bbbbbb7b7b7b"}},"dataVersion": "","metadataVersion": "1"}]
2022-11-14T22:40:46.335 [Information] Got BlobCreated event data, blob URI https://example.blob.core.windows.net/testcontainer/testfile.txt
2022-11-14T22:40:46.346 [Information] Executed 'Function1' (Succeeded, Id=8429137d-9245-438c-8206-f9e85ef5dd61, Duration=387ms)

Вы также можете протестировать, создав учетную запись хранения BLOB-объектов или учетную запись хранения общего назначения версии 2, добавив подписку на событие и установив конечную точку в URL-адресе функции:

Снимок экрана: URL-адрес функции Azure, используемый в качестве конечной точки подписки на события сетки событий на портале Azure.

Обработка пользовательских событий

Наконец, расширьте функцию, чтобы она также обрабатывала пользовательские события.

Добавьте проверку для вашего события Contoso.Items.ItemReceived. Итоговый код должен выглядеть следующим образом:

using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System;
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;

namespace Function1
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            string response = string.Empty;
            BinaryData events = await BinaryData.FromStreamAsync(req.Body);
            log.LogInformation($"Received events: {events}");

            EventGridEvent[] eventGridEvents = EventGridEvent.ParseMany(events);

            foreach (EventGridEvent eventGridEvent in eventGridEvents)
            {
                // Handle system events
                if (eventGridEvent.TryGetSystemEventData(out object eventData))
                {
                    // Handle the subscription validation event
                    if (eventData is SubscriptionValidationEventData subscriptionValidationEventData)
                    {
                        log.LogInformation($"Got SubscriptionValidation event data, validation code: {subscriptionValidationEventData.ValidationCode}, topic: {eventGridEvent.Topic}");
                        // Do any additional validation (as required) and then return back the below response

                        var responseData = new
                        {
                            ValidationResponse = subscriptionValidationEventData.ValidationCode
                        };
                        return new OkObjectResult(responseData);
                    }
                    // Handle the storage blob created event
                    else if (eventData is StorageBlobCreatedEventData storageBlobCreatedEventData)
                    {
                        log.LogInformation($"Got BlobCreated event data, blob URI {storageBlobCreatedEventData.Url}");
                    }
                }
                // Handle the custom contoso event
                else if (eventGridEvent.EventType == "Contoso.Items.ItemReceived")
                {
                    var contosoEventData = eventGridEvent.Data.ToObjectFromJson<ContosoItemReceivedEventData>();
                    log.LogInformation($"Got ContosoItemReceived event data, item SKU {contosoEventData.ItemSku}");
                }
            }
            return new OkObjectResult(response);
        }
    }
}
module.exports = function (context, req) {
    context.log('JavaScript HTTP trigger function begun');
    var validationEventType = "Microsoft.EventGrid.SubscriptionValidationEvent";
    var storageBlobCreatedEvent = "Microsoft.Storage.BlobCreated";
    var customEventType = "Contoso.Items.ItemReceived";

    for (var events in req.body) {
        var body = req.body[events];
        // Deserialize the event data into the appropriate type based on event type
        if (body.data && body.eventType == validationEventType) {
            context.log("Got SubscriptionValidation event data, validation code: " + body.data.validationCode + " topic: " + body.topic);

            // Do any additional validation (as required) and then return back the below response
            var code = body.data.validationCode;
            context.res = { status: 200, body: { "ValidationResponse": code } };
        }

        else if (body.data && body.eventType == storageBlobCreatedEvent) {
            var blobCreatedEventData = body.data;
            context.log("Relaying received blob created event payload:" + JSON.stringify(blobCreatedEventData));
        }

        else if (body.data && body.eventType == customEventType) {
            var payload = body.data;
            context.log("Relaying received custom payload:" + JSON.stringify(payload));
        }
    }
    context.done();
};

Тестирование обработки пользовательских событий

Наконец, проверьте, что функция теперь может обрабатывать настраиваемый тип события.

[{
    "subject": "Contoso/foo/bar/items",
    "eventType": "Contoso.Items.ItemReceived",
    "eventTime": "2017-08-16T01:57:26.005121Z",
    "id": "602a88ef-0001-00e6-1233-1646070610ea",
    "data": { 
            "itemSku": "Standard"
            },
    "dataVersion": "",
    "metadataVersion": "1"
}]

Вы также можете протестировать эту функциональность в реальном времени, отправив настраиваемое событие с помощью CURL на портале или отправив в пользовательский раздел с помощью любой службы или приложения, которые могут отправлять post в конечную точку. Создайте пользовательскую тему и подписку на события с конечной точкой, установленной как URL-адрес функции.

Заголовки сообщений

Ниже приведены свойства, которые предоставляются в заголовках сообщений.

Имя свойства Описание
aeg-subscription-name Имя подписки на события.
aeg-delivery-count Число попыток, выполненных для события.
aeg-event-type

Тип события.

Может иметь одно из следующих значений.

  • Проверка подписки
  • уведомление
  • Удаление подписки
aeg-metadata-version

Версия метаданных события.

Для схемы событий Сетки событий это свойство представляет версию метаданных, а для схемы событий облака — версию спецификации.

aeg-data-version

Версия данных события.

Для схемы событий Сетки событий это свойство представляет версию данных, а для схемы событий облака оно не используется.

aeg-output-event-id Идентификатор события Event Grid.