Поделиться через


Краткое руководство: Проверка использования схемы Avro при потоковой передаче событий с использованием SDK для .NET в Центрах событий (AMQP)

Из этого краткого руководства вы узнаете, как отправлять события и получать события из концентратора событий с проверкой схемы с помощью библиотеки Azure.Messaging.EventHubs .NET.

Реестр схем Azure — это функция Центров событий. Реестр предоставляет центральный репозиторий для схем на основе событий и приложений, ориентированных на обмен сообщениями. Он обеспечивает гибкость работы приложений-производителей и приложений-получателей при обмене данными, избавляя от необходимости управлять схемой и позволяя совместно использовать ее в приложениях обоих типов. Реестр схем также предоставляет простую платформу управления для повторно используемых схем и определяет связь между схемами через конструкцию группировки (группы схем). Дополнительные сведения см. в статье Реестр схем Azure в Центрах событий.

Предпосылки

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями о Центрах событий, прежде чем приступить к работе с этим руководством.

Для завершения этой быстрой установки вам потребуются следующие пререквизиты:

  • Если у вас еще нет подписки Azure, создайте бесплатную учетную запись перед началом работы.

  • Microsoft Visual Studio 2022.

    Клиентская библиотека Центров событий Azure использует функции, представленные в C# 8.0. Вы по-прежнему можете использовать библиотеку с предыдущими версиями языка C#, но новый синтаксис недоступен. Чтобы использовать полный синтаксис, мы рекомендуем скомпилировать с помощью .NET Core SDK версии 3.0 или выше, установив версию языка на latest.

    Если вы используете Visual Studio, версии до Visual Studio 2019 несовместимы с инструментами, необходимыми для создания проектов C# 8.0. Чтобы скачать Visual Studio 2019 или Visual Studio 2022, включая бесплатную версию Community, см. Visual Studio.

Создание концентратора событий

Чтобы создать пространство имен Центров событий и концентратор событий, следуйте инструкциям из статьи "Создание пространства имен Центров событий" и концентратора событий.

Чтобы получить строку подключения к пространству имен для Центров событий, следуйте инструкциям в разделе Получение строки подключения.

Запишите следующие параметры для использования в текущем быстром начале.

  • Строка подключения для пространства имен Event Hubs
  • Имя концентратора событий

Создание схемы

Чтобы создать группу схем и схему, следуйте инструкциям из руководства по созданию схем с помощью реестра схем.

  1. Создайте группу схем с именем contoso-sg с помощью портала реестра схем. Используйте Avro в качестве типа сериализации и None для режима совместимости.

  2. В этой группе схем создайте новую схему Avro с именем схемы: Microsoft.Azure.Data.SchemaRegistry.example.Order Используйте следующее содержимое схемы.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Добавление пользователя в роль читателя реестра схем

Добавьте учетную запись пользователя в роль читателя реестра схем на уровне пространства имен. Вы также можете использовать роль Контрибьютора реестра схем, но это не необходимо для данного краткого обзора.

  1. На странице пространства имен Центров событий в меню слева выберите управление доступом (IAM).
  2. На странице управления доступом (IAM) выберите +Добавить>назначение ролей.
  3. На странице "Роли" выберите "Читатель реестра схем", затем выберите "Далее".
  4. Используйте ссылку "+ Выбрать участников" , чтобы добавить учетную запись пользователя в роль, а затем нажмите кнопку "Далее".
  5. На странице "Рецензирование и назначение" выберите "Рецензирование и назначение".

Создание событий для концентраторов событий с проверкой схемы

Создание консольного приложения для производителя событий

  1. Запустите Visual Studio.

  2. Выберите Создать новый проект.

  3. В диалоговом окне "Создание проекта" выполните следующие действия. Если вы не видите это диалоговое окно, выберите "Файл " в меню, нажмите кнопку "Создать" и выберите "Проект".

    1. Выберите язык программирования C#.

    2. Для типа приложения выберите значение Консоль.

    3. В списке результатов выберите Консольное приложение.

    4. Затем выберите Далее.

      Снимок экрана: диалоговое окно

  4. Введите OrderProducer для имени проекта , SRQuickStart для имени решения и нажмите кнопку "ОК ", чтобы создать проект.

Добавление пакета NuGet Центров событий

  1. Выберите Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.

  2. Выполните следующие команды, чтобы установить Azure.Messaging.EventHubs и другие пакеты NuGet. Нажмите клавишу ВВОД , чтобы выполнить последнюю команду.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Проверка подлинности приложений производителя для подключения к Azure с помощью Visual Studio. Дополнительные сведения см. в клиентской библиотеке удостоверений Azure для .NET.

  4. Войдите в Azure с помощью учетной записи пользователя, являющейся членом Schema Registry Reader роли на уровне пространства имен. Сведения о ролях реестра схем см. в разделе "Управление доступом на основе ролей Azure".

Создание кода с помощью схемы Avro

  1. Используйте то же содержимое, которое использовалось для создания схемы для создания файла с именем Order.avsc. Сохраните файл в папке проекта или решения.
  2. Используйте этот файл схемы для создания кода для .NET. Для создания кода можно использовать любое средство создания внешнего кода, например avrogen . Например, выполните команду avrogen -s .\Order.avsc . для создания кода.
  3. После создания кода вы увидите файл с именем Order.cs в папке \Microsoft\Azure\Data\SchemaRegistry\example . Для схемы Avro здесь он создает типы C# в Microsoft.Azure.Data.SchemaRegistry.example пространстве имен.
  4. Добавьте файл Order.cs в проект OrderProducer.

Написание кода для сериализации и отправки событий в концентратор событий

  1. Добавьте в файл Program.cs указанный ниже код. Дополнительные сведения см. в комментариях кода. Высокоуровневые шаги в коде:

    1. Создайте клиент производителя, который можно использовать для отправки событий в концентратор событий.
    2. Создайте клиент реестра схем, который можно использовать для сериализации и проверки данных в объекте Order .
    3. Создайте новый Order объект с помощью созданного Order типа.
    4. Используйте клиент реестра схем для сериализации объекта Order в EventData.
    5. Создайте пакет событий.
    6. Добавьте данные события в пакет событий.
    7. Используйте клиент производителя для отправки пакета событий в концентратор событий.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Замените следующие значения заполнителей реальными значениями.

    • EVENTHUBSNAMESPACECONNECTIONSTRING — строка подключения для пространства имен Event Hubs.
    • EVENTHUBNAME — имя концентратора событий
    • EVENTHUBSNAMESPACENAME — имя пространства имен Центров событий
    • SCHEMAGROUPNAME — имя группы схем
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Выполните сборку проекта и убедитесь, что она прошла без ошибок.

  4. Выполните программу и дождитесь подтверждающего сообщения.

    A batch of 1 order has been published.
    
  5. В портале Azure можно проверить, что концентратор событий получил события. Перейдите в представление "Сообщения" в разделе "Метрики ". Обновите страницу, чтобы обновить диаграмму. Может потребоваться несколько секунд, чтобы показать, что оно получило сообщения.

    Изображение страницы портала Azure для проверки, что концентратор событий получил события.

Потребление событий из центров событий с проверкой схемы

В этом разделе показано, как написать консольное приложение .NET Core, которое получает события из концентратора событий и использовать реестр схем для десериализации данных событий.

Дополнительные требования

  • Создайте учетную запись хранения для использования обработчика событий.

Создание потребительского приложения

  1. В окне обозревателя решений щелкните правой кнопкой мыши решение SRQuickStart , выберите "Добавить" и выберите "Создать проект".
  2. Выберите Консольное приложение и нажмите Далее.
  3. Введите OrderConsumer для имени проекта и нажмите кнопку "Создать".
  4. В окне обозревателя решений щелкните правой кнопкой мыши OrderConsumer и выберите "Задать в качестве запускаемого проекта".

Добавление пакета NuGet Центров событий

  1. Выберите Инструменты>Диспетчер пакетов NuGet>Консоль диспетчера пакетов.

  2. В окне консоли диспетчера пакетов убедитесь, что OrderConsumer выбран для проекта по умолчанию. В противном случае используйте раскрывающийся список, чтобы выбрать OrderConsumer.

  3. Выполните следующую команду, чтобы установить необходимые пакеты NuGet. Нажмите клавишу ВВОД , чтобы выполнить последнюю команду.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Проверка подлинности приложений производителя для подключения к Azure с помощью Visual Studio, как показано в клиентской библиотеке удостоверений Azure для .NET.

  5. Войдите в Azure, используя учетную запись пользователя, который является членом Schema Registry Reader роли на уровне пространства имен. Сведения о ролях реестра схем см. в разделе "Управление доступом на основе ролей Azure".

  6. Добавьте файл Order.cs, который вы создали в процессе создания приложения-производителя, в проект OrderConsumer.

  7. Щелкните правой кнопкой мыши проект OrderConsumer и выберите «Задать как стартовый проект».

Написание кода для получения событий и десериализации их с помощью реестра схем

  1. Добавьте в файл Program.cs указанный ниже код. Дополнительные сведения см. в комментариях кода. Высокоуровневые шаги в коде:

    1. Создайте клиент потребителя, который можно использовать для отправки событий в концентратор событий.
    2. Создайте клиент BLOB-контейнера в хранилище BLOB-объектов Azure.
    3. Создайте клиент обработчика событий и зарегистрируйте обработчики событий и ошибок.
    4. В обработчике событий создайте клиент реестра схем, который можно использовать для десериализации данных событий в Order объект.
    5. Десериализуйте данные события в объект Order с помощью сериализатора.
    6. Распечатайте сведения о полученном заказе.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be used as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Замените следующие значения заполнителей реальными значениями.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING — строка подключения для пространства имен Event Hubs.
    • EVENTHUBNAME — имя концентратора событий
    • EVENTHUBSNAMESPACENAME — имя пространства имен Центров событий
    • SCHEMAGROUPNAME — имя группы схем
    • AZURESTORAGECONNECTIONSTRING — строка подключения для учетной записи хранения Azure
    • BLOBCONTAINERNAME — Имя контейнера BLOB-объектов
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Выполните сборку проекта и убедитесь, что она прошла без ошибок.

  4. Запустите приложение получателя.

  5. Вы должны получить уведомление о том, что концентратор событий получил события.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Здесь отображаются те же три события, которые вы ранее отправили в концентратор событий, выполняя программу отправителя.

Образцы

Сведения о клиентской библиотеке Apache Avro для .NET см. в реестре схем Azure.

Очистите ресурсы

Удалите пространство имен Центров событий или удалите группу ресурсов, содержащую пространство имен.

Следующий шаг