Использование Spring Kafka с Центры событий Azure для API Kafka

В этом руководстве показано, как настроить Java-платформу Spring Cloud Stream Binder для использования Центры событий Azure для Kafka для отправки и получения сообщений с Центры событий Azure. Дополнительные сведения см. в разделе Использование Центры событий Azure в приложениях Apache Kafka

В этом учебном пособии мы включим два метода аутентификации: аутентификацию через Microsoft Entra и аутентификацию с использованием подписей для общего доступа (SAS). На вкладке Passwordless отображается проверка подлинности Microsoft Entra, а на вкладке Connection string — проверка подлинности SAS.

Аутентификация Microsoft Entra — это механизм подключения к Центры событий Azure для Kafka с использованием идентификаторов, определенных в Microsoft Entra ID. С помощью проверки подлинности Microsoft Entra вы можете управлять удостоверениями пользователей базы данных и другими службами Microsoft в центральном расположении, что упрощает управление разрешениями.

Аутентификация SAS использует строку подключения пространства имен Центры событий Azure для делегированного доступа к Event Hubs для Kafka. Если Вы решите использовать подписи общего доступа в качестве учетных данных, необходимо самостоятельно управлять строкой подключения.

Предварительные условия

Внимание

Для выполнения действий, описанных в этом руководстве, требуется Spring Boot версии 2.5 или более поздней.

Подготовьте учетные данные

Центры событий Azure поддерживает использование Microsoft Entra ID для авторизации запросов к ресурсам Центров событий. С помощью Microsoft Entra ID можно использовать Azure управление доступом на основе ролей (Azure RBAC) для предоставления разрешений субъекту безопасности, который может быть пользователем или субъектом-службой приложений.

Если вы хотите выполнить этот пример локально с проверкой подлинности Microsoft Entra, убедитесь, что учетная запись пользователя прошла проверку подлинности с помощью Azure Toolkit для IntelliJ, плагина Azure Account для Visual Studio Code или Azure CLI. Кроме того, убедитесь, что у учетной записи есть достаточные разрешения.

Примечание.

При использовании подключений без пароля необходимо предоставить вашему аккаунту доступ к ресурсам. В Центры событий Azure назначьте роль Центры событий Azure Data Receiver и Центры событий Azure Data Sender учетной записи Microsoft Entra, которую вы используете. Дополнительные сведения о предоставлении ролей доступа см. в статье Назначение ролей Azure с помощью портала Azure и Авторизация доступа к ресурсам Центров событий с помощью Microsoft Entra ID.

Отправка и получение сообщений из Центры событий Azure

С помощью концентратора событий Azure можно отправлять и получать сообщения с помощью Spring Cloud Azure.

Чтобы установить модуль Spring Cloud Azure Starter, добавьте следующие зависимости в файл pom.xml:

  • Акт технических спецификаций Spring Cloud Azure (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.2.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Примечание.

    Если вы используете Spring Boot 4.0.x, обязательно установите версию, указанную в spring-cloud-azure-dependencies, на значение 7.2.0.

    Если вы используете Spring Boot 3.5.x, обязательно задайте версию spring-cloud-azure-dependencies на 6.2.0.

    Если вы используете Spring Boot 3.1.x-3.5.x, обязательно установите для нее версию spring-cloud-azure-dependencies на 5.25.0.

    Если вы используете Spring Boot 2.x, обязательно установите версию spring-cloud-azure-dependencies на 4.20.0.

    Эта ведомость материалов (BOM) должна быть сконфигурирована в <dependencyManagement> разделе файла pom.xml. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию.

    Дополнительные сведения о версии, используемой для этого BOM, см. в разделе Какую версию Spring Cloud Azure я должен использовать.

  • Артефакт Spring Cloud Azure Starter:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    

Написать код приложения

Выполните следующие действия, чтобы настроить приложение для создания и использования сообщений с помощью Центры событий Azure.

  1. Настройте учетные данные концентратора событий, добавив следующие свойства в файл application.properties .

    spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093
    spring.cloud.function.definition=consume;supply
    spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME}
    spring.cloud.stream.bindings.consume-in-0.group=$Default
    spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
    

    Совет

    Если используется версия spring-cloud-azure-dependencies:4.3.0, необходимо добавить свойство spring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources со значением com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration.

    Так как 4.4.0это свойство будет добавлено автоматически, поэтому его не нужно добавлять вручную.

    В следующей таблице описаны поля в конфигурации:

    Поле Описание
    spring.cloud.stream.kafka.binder.brokers Указывает конечную точку Центры событий Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Указывает концентратора событий ввода, который в этом учебном пособии является концентратором, созданным ранее.
    spring.cloud.stream.bindings.consume-in-0.group Указывает группу подписчиков из Центры событий Azure, которую можно установить в $Default, чтобы использовать базовую группу подписчиков, созданную при создании экземпляра Центры событий Azure.
    spring.cloud.stream.bindings.supply-out-0.destination Указывает концентратор событий назначения выходных данных, который для этого руководства совпадает с назначением входных данных.

    Примечание.

    Если вы включите автоматическое создание раздела, обязательно добавьте элемент конфигурации spring.cloud.stream.kafka.binder.replicationFactorс значением, равным по крайней мере 1. Дополнительные сведения см. в справочном руководстве по Spring Cloud Stream Kafka Binder.

  2. Измените файл класса запуска, чтобы отобразить следующее содержимое.

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.GenericMessage;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    
    @SpringBootApplication
    public class EventHubKafkaBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class);
    
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubKafkaBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->LOGGER.info("New message received: '{}'", message.getPayload());
        }
    
        @Override
        public void run(String... args) {
            many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Совет

    В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует DefaultAzureCredential, которую предоставляет библиотека удостоверений Azure для получения учетных данных без необходимости изменения кода.

    DefaultAzureCredential поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.

    Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в разделе аутентификация Azure в средах разработки на Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. Дополнительные сведения см. в разделе Управляемые удостоверения для ресурсов Azure?

  3. Запустите приложение. Сообщения, как показано в следующем примере, будут размещены в журнале приложений:

    Kafka version: 3.0.1
    Kafka commitId: 62abe01bee039651
    Kafka startTimeMs: 1622616433956
    New message received: 'Hello World'
    

Развертывание в Приложения Azure Spring

Теперь, когда у вас есть приложение Spring Boot, работающее локально, пришло время переместить его в рабочую среду. Приложения Azure Spring упрощает развертывание приложений Spring Boot для Azure без каких-либо изменений кода. Эта служба управляет инфраструктурой приложений Spring, благодаря чему разработчики могут сосредоточиться на коде. Приложения Azure Spring обеспечивает управление жизненным циклом с помощью комплексного мониторинга и диагностики, управления конфигурацией, обнаружения служб, интеграции CI/CD, развертывания по схеме blue-green и прочее. Чтобы развернуть ваше приложение в Приложения Azure Spring, см. раздел Как развернуть первое приложение в Приложения Azure Spring.

Следующие шаги