Поделиться через


Краткое руководство: Отправка или получение событий из центров событий Azure.

Из этого краткого руководства вы узнаете, как отправлять события в концентратор событий Azure и получать их с помощью пакета Java для обмена сообщениями azure .

Подсказка

Если вы работаете с ресурсами Центров событий Azure в приложении Spring, рекомендуется рассмотреть Azure Spring Cloud как альтернативу. Spring Cloud Azure — это проект с открытым исходным кодом, который обеспечивает простую интеграцию Spring со службами Azure. Дополнительные сведения о Spring Cloud Azure и примерах использования Центров событий см. в статье Spring Cloud Stream с Центрами событий Azure.

Предпосылки

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями о Центрах событий, прежде чем приступить к работе с этим руководством.

Для завершения этой быстрой установки вам потребуются следующие пререквизиты:

  • Подписка Microsoft Azure. Чтобы использовать службы Azure, в том числе Центры событий Azure, потребуется действующая подписка. Если у вас еще нет учетной записи Azure, зарегистрируйтесь для работы с бесплатной пробной версией или активируйте преимущества для подписчиков MSDN при создании учетной записи.
  • Среда разработки Java. Это руководство по быстрому старту использует Eclipse. Пакет средств разработки Java (JDK) с версией 8 или более поздней требуется.
  • Создайте пространство имен Центров событий и концентратор событий. Первым шагом является использование портала Azure для создания пространства имен типа Event Hubs и получения учетных данных управления, необходимых приложению для взаимодействия с узлом событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи. Затем получите строку подключения для пространства имен Центров событий , следуя инструкциям из статьи: Получение строки подключения. Строка подключения будет использоваться в дальнейшем в этом кратком руководстве.

Отправка событий

В этом разделе показано, как создать приложение Java для отправки событий в узел событий.

Добавление ссылки на библиотеку Центров событий Azure

Сначала создайте проект Maven для консольного или оболочкного приложения в любимой среде разработки Java. pom.xml Обновите файл следующим образом. Клиентская библиотека Java для Центров событий доступна в центральном репозитории Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>

Замечание

Обновите версию до последней версии, опубликованной в репозитории Maven.

Проверка подлинности приложения в Azure

В этом кратком руководстве показано два способа подключения к Центрам событий Azure:

  • Без пароля. Используйте принципал безопасности в Microsoft Entra ID и управление доступом на основе ролей (RBAC) для подключения к пространству имен Центров событий. Вам не нужно беспокоиться о наличии жестко закодированных строк подключения в коде, в файле конфигурации или безопасном хранилище, например Azure Key Vault.
  • Строка подключения. Используйте строку подключения для подключения к пространству имен Event Hubs. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать.

Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в статье "Проверка подлинности и авторизация служебной шины" иподключения без пароля для служб Azure.

Назначение ролей пользователю Microsoft Entra

При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Центрам событий Azure, имеет правильные разрешения. Для отправки и получения сообщений требуется роль владельца данных Центров событий Azure . Чтобы назначить себе эту роль, вам потребуется роль администратора доступа пользователей или другая роль, которая включает Microsoft.Authorization/roleAssignments/write действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения см. на странице "Общие сведения об области применения Azure RBAC".

В следующем примере вашей учетной записи пользователя назначается роль Azure Event Hubs Data Owner, предоставляющая полный доступ к ресурсам Azure Event Hubs. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.

Встроенные роли Azure для Центров событий Azure

Для Azure Event Hubs управление пространствами имен и всеми связанными ресурсами через портал Azure и API управления ресурсами Azure уже защищено с помощью модели Azure RBAC. Azure предоставляет следующие встроенные роли для авторизации доступа к пространству имен Центров событий:

Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций Центров событий.

Это важно

В большинстве случаев для распространения назначения ролей в Azure требуется минута или две. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. В портале Azure находите ваше пространство имен Event Hubs с помощью основной строки поиска или навигации слева.

  2. На странице обзора выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите +Добавить в верхнем меню. Затем выберите "Добавить назначение роли".

    Снимок экрана: назначение роли.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Azure Event Hubs Data Owner и выберите соответствующий результат. Теперь щелкните Далее.

  6. В разделе "Назначение доступа" выберите "Пользователь", "Группа" или "Субъект-служба". Затем нажмите кнопку +Выбрать участников.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты). Нажмите Выбрать в нижней части диалогового окна.

  8. Выберите Рецензировать + назначить, чтобы перейти на окончательную страницу. Нажмите кнопку "Проверить и назначить " еще раз, чтобы завершить процесс.

Написание кода для отправки сообщений в концентратор событий

Добавьте класс с именем Senderи добавьте следующий код в класс:

Это важно

  • Обновите <NAMESPACE NAME> с именем пространства имен Центров событий.
  • Обновите <EVENT HUB NAME>, указав имя вашего концентратора событий.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Выполните сборку программы и убедитесь, что ошибки отсутствуют. После запуска программы-получателя вы запустите эту программу.

Получение событий

Код в этом руководстве основан на примере EventProcessorClient на GitHub, который можно проверить, чтобы увидеть полное рабочее приложение.

Следуйте этим рекомендациям при использовании хранилища BLOB-объектов Azure в качестве хранилища контрольных точек:

  • Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
  • Не используйте учетную запись хранения для других действий.
  • Не используйте контейнер для ничего другого.
  • Создайте учетную запись хранения в том же регионе, что и развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.

На странице учетной записи хранения в портале Azure в разделе службы BLOB убедитесь, что следующие параметры отключены.

  • Иерархическое пространство имен
  • Обратимое удаление BLOB-объекта
  • Управление версиями

Создание хранилища Azure и контейнера BLOB-объектов

В этом кратком руководстве вы используете службу хранилища Azure (блочное хранилище, в частности) в качестве хранилища контрольных точек. Контрольная точка — это процесс, с помощью которого обработчик событий помечает или фиксирует позицию последнего успешно обработанного события в секции. Маркировка контрольной точки обычно выполняется в функции, которая обрабатывает события. Чтобы узнать больше о контрольных точках, см. раздел Обработчик событий.

Выполните указанные ниже действия, чтобы создать учетную запись хранения Azure.

  1. Создайте учетную запись хранения Azure
  2. Создайте контейнер BLOB
  3. Аутентификация для доступа к контейнеру BLOB-объектов

При локальной разработке убедитесь, что учетная запись пользователя, которая обращается к данным больших двоичных объектов, имеет правильные разрешения. Необходим участник данных BLOB-объектов хранилища для чтения и записи данных BLOB-объектов. Чтобы назначить себе эту роль, необходимо назначить роль администратора доступа пользователей или другую роль, которая включает действие Microsoft.Authorization/roleAssignments/write . Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения см. в Описание области действия для Azure RBAC.

В этом сценарии вы назначаете разрешения своей учетной записи пользователя, ограниченные учетной записью хранения, чтобы следовать принципу наименьших привилегий. В рамках этой практики пользователям предоставляются только минимальные необходимые разрешения, что позволяет создавать более защищенные рабочие среды.

В следующем примере роль участника данных BLOB-объектов хранилища назначается учетной записи пользователя, которая предоставляет доступ как для чтения, так и записи к данным BLOB-объектов в вашей учетной записи хранения.

Это важно

В большинстве случаев для распространения назначения ролей в Azure требуется минута или две. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. На портале Azure найдите свою учетную запись хранения, воспользовавшись основной панелью поиска или областью навигации слева.

  2. На странице учетной записи хранения выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите +Добавить в верхнем меню. Затем выберите "Добавить назначение роли".

    Снимок экрана: назначение роли учетной записи хранения.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Storage Blob Data Contributor. Выберите соответствующий результат, а затем нажмите кнопку Далее.

  6. В разделе Назначение доступа выберите Пользователь, группа или сервисный принципал, а затем нажмите + Выбрать участников.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш адрес электронной почты user@domain), а затем выберите Select в нижней части диалогового окна.

  8. Выберите Рецензировать + назначить, чтобы перейти на окончательную страницу. Нажмите кнопку "Проверить и назначить " еще раз, чтобы завершить процесс.

Добавление библиотек Центров событий в проект Java

Добавьте следующие зависимости в файл pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.20.2</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.20.6</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.16.1</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Добавьте следующие import инструкции в верхней части файла Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Создайте класс с именем Receiverи добавьте в класс следующие строковые переменные. Замените заполнители правильными значениями.

    Это важно

    Замените заполнители правильными значениями.

    • <NAMESPACE NAME> с именем вашего пространства имен Центров событий.
    • Укажите <EVENT HUB NAME> как имя концентратора событий в пространстве имен.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Добавьте следующий main метод в класс.

    Это важно

    Замените заполнители правильными значениями.

    • Замените <STORAGE ACCOUNT NAME> на имя вашей учетной записи хранения Azure.
    • <CONTAINER NAME> с именем контейнера BLOB в учетной записи хранения
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Добавьте в класс два вспомогательных PARTITION_PROCESSOR метода (ERROR_HANDLERиReceiver), которые обрабатывают события и ошибки.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Выполните сборку программы и убедитесь, что ошибки отсутствуют.

Запуск приложений

  1. Сначала запустите приложение Приемника .

  2. Затем запустите приложение Отправителя .

  3. В окне приложения приемника убедитесь, что вы увидите события, опубликованные приложением Отправителя.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Нажмите клавишу ВВОД в окне приложения приемника, чтобы остановить приложение.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

См. следующие примеры на сайте GitHub: