Руководство разработчика JMS 2.0 служебной шины Azure

В этом руководстве содержатся подробные сведения, помогающие успешно взаимодействовать с Служебная шина Azure с помощью API службы сообщений Java (JMS) 2.0.

Если вы не знакомы с служебной шиной Azure, ознакомьтесь со следующими статьями.

Начало работы Концепции

модель программирования службы сообщений Java (JMS)

Модель программирования API службы сообщений Java описана в следующих разделах:

Примечание.

Уровень "Премиум" служебной шины Azure поддерживает JMS 1.1 и JMS 2.0.

Служебная шина Azure — уровень "Стандартный " поддерживает ограниченные функциональные возможности JMS 1.1. Дополнительные сведения см. в этой документации.

JMS — стандартные блоки

Используйте следующие стандартные блоки для взаимодействия с приложением JMS.

Примечание.

Это руководство адаптировано из учебника Oracle Java EE 6 для службы сообщений Java (JMS).

Дополнительные сведения о службе сообщений Java (JMS) см. в этом руководстве.

Фабрика подключений

Примечание.

Библиотека azure-servicebus-jms доступна в двух вариантах: com.azure:azure-servicebus-jms (версия 2.0.0.0+) для Jakarta EE (jakarta.jms.*) и com.microsoft.azure:azure-servicebus-jms (версия 1.0.x) для Java EE (javax.jms.*). Рекомендации по выбору правильного артефакта см. в разделе "Поддержка Jakarta EE" и javax.

Клиент использует объект фабрики соединений для подключения к поставщику JMS. Фабрика подключений инкапсулирует набор параметров конфигурации подключения, которые определяет администратор.

Каждая фабрика соединений является экземпляром интерфейса ConnectionFactory, интерфейса QueueConnectionFactory или интерфейса TopicConnectionFactory.

Чтобы упростить подключение к Служебная шина Azure, эти интерфейсы реализуются с помощью ServiceBusJmsConnectionFactory, ServiceBusJmsQueueConnectionFactory или ServiceBusJmsTopicConnectionFactory соответственно.

Это важно

Java приложения, использующие API JMS 2.0, могут подключаться к Служебная шина Azure с помощью строки подключения или с помощью TokenCredential для использования поддерживаемой аутентификации Microsoft Entra. При использовании резервной проверки подлинности с Microsoft Entra убедитесь, что назначьте роли и разрешения на учетную запись по мере необходимости.

Создайте системно назначаемую управляемую идентичность в Azure и используйте эту идентичность для создания TokenCredential.

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

Вы можете создать экземпляр фабрики соединений со следующими параметрами:

  • Учетные данные токена — это учетные данные, способные предоставлять токен OAuth.
  • Хост — имя хоста пространства имен премиум уровня Служебная шина Azure.
  • Контейнер свойств ServiceBusJmsConnectionFactorySettings, содержащий:
    • connectionIdleTimeoutMS — время ожидания простоя подключения в миллисекундах.
    • traceFrames — логический флаг для сбора кадров трассировки AMQP для отладки.
    • Другие параметры конфигурации.

Создайте фабрику, как показано в следующем примере. Учётные данные токена и узел являются обязательными параметрами, тогда как другие свойства необязательны.

String host = "<YourNamespaceName>.servicebus.windows.net";
ConnectionFactory factory = new ServiceBusJmsConnectionFactory(tokenCredential, host, null); 

Назначение JMS

Назначение — это объект, который клиент использует для указания целевого объекта сообщений, которые он создает, и источника потребляемых сообщений.

Назначения соответствуют сущностям в Служебная шина Azure — очереди (в сценариях 'точка-точка') и топики (в сценариях pub-sub).

Связи

Соединение инкапсулирует виртуальное соединение с поставщиком услуг JMS. Служебная шина Azure представляет подключение с отслеживанием состояния между приложением и служебной шиной Azure через AMQP.

Создайте подключение из фабрики подключений, как показано в следующем примере:

Connection connection = factory.createConnection();

Сеансы

Сеанс — это однопоточный контекст для создания и потребления сообщений. Используйте его для создания сообщений, производителей сообщений и потребителей. Он также предоставляет транзакционный контекст для объединения операций отправки и получения в атомарную единицу работы.

Создайте сеанс из объекта подключения, как показано в следующем примере:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Примечание.

API JMS не поддерживает получение сообщений из очередей служебной шины или разделов с включенными сеансами обмена сообщениями.

Режимы сеанса

Создайте сеанс с любым из следующих режимов.

Режимы сеанса Поведение
Session.AUTO_ACKNOWLEDGE (автоматическое подтверждение сеанса) Сеанс автоматически подтверждает получение клиентом сообщения, когда сеанс успешно возвращается из вызова для получения или когда прослушиватель сообщений вызывает сеанс, чтобы обработать сообщение успешно.
Session.CLIENT_ACKNOWLEDGE (Поток.КЛИЕНТСКОЕ_ПОДТВЕРЖДЕНИЕ) Клиент признает потребляемое сообщение путем вызова метода подтверждения сообщения.
Session.DUPS_OK_ACKNOWLEDGE Этот режим подтверждения предписывает сеансу отложенно подтвердить доставку сообщений.
Session.SESSION_TRANSACTED Передайте это значение в качестве аргумента методу createSession(int sessionMode) объекта Connection, чтобы указать, что сеанс должен использовать локальную транзакцию.

Если режим сеанса не указан, значение по умолчанию Session.AUTO_ACKNOWLEDGE.

JMSContext

Примечание.

JMSContext определяется как часть спецификации JMS 2.0.

JMSContext объединяет функциональные возможности, предоставляемые объектом подключения и сеанса. Вы создаете его из объекта фабрики соединений.

JMSContext context = connectionFactory.createContext();

Режимы JMSContext

Как и объект Session , можно создать JMSContext с теми же режимами подтверждения, что и в режимах сеанса.

JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);

Если режим не указан, по умолчанию используется JMSContext.AUTO_ACKNOWLEDGE.

Производители сообщений JMS

Производитель сообщений — это объект, создаваемый с помощью JMSContext или сеанса. Используйте его для отправки сообщений в место назначения.

Его можно создать как автономный объект, как показано в следующем примере:

JMSProducer producer = context.createProducer();

Вы также можете создать его во время выполнения, когда нужно отправить сообщение.

context.createProducer().send(destination, message);

Потребители сообщений JMS

Потребитель сообщения — это объект, создаваемый JMSContext или сеансом. Используйте его для получения сообщений, отправленных в место назначения. Создайте его, как показано в следующем примере:

JMSConsumer consumer = context.createConsumer(dest);

Синхронные приемы через метод receive()

Потребитель сообщения предоставляет синхронный способ получения сообщений из назначения с помощью receive() метода.

Если аргументы или время ожидания не указаны, или указано время ожидания 0, потребитель будет блокирован на неопределенное время до тех пор, пока не поступит сообщение или не будет прервано подключение (в зависимости от того, что произойдет раньше).

Message m = consumer.receive();
Message m = consumer.receive(0);

При предоставлении положительного аргумента, отличного от нуля, потребитель блокируется, пока не истечёт время таймера.

Message m = consumer.receive(1000); // time out after one second.

Асинхронное получение сообщений с прослушивателями JMS

Прослушиватель сообщений — это объект, используемый для асинхронной обработки сообщений на пункте назначения. Он реализует MessageListener интерфейс, содержащий onMessage метод, в котором должна жить конкретная бизнес-логика.

Необходимо создать экземпляр объекта прослушивателя сообщений и зарегистрировать его для определенного потребителя сообщения с помощью setMessageListener метода.

Listener myListener = new Listener();
consumer.setMessageListener(myListener);

Потребление из тем

Вы создаете потребители сообщений JMS для пункта назначения, которое может быть очередью или темой.

Потребители в очередях — это просто клиентские объекты, которые живут в контексте сеанса (и подключения) между клиентским приложением и Служебная шина Azure.

Потребители по темам, однако, имеют две части -

  • Клиентский объект, который существует в контексте сеанса (или JMSContext) и
  • Подписка, которая является сущностью в Сервисной шине Azure.

Подписки описаны здесь и могут быть одним из следующих типов:

  • общие устойчивые подписки;
  • общие неустойчивые подписки;
  • Неразделяемые устойчивые подписки
  • Необщие неустойчивые подписки

Браузеры очередей JMS

API JMS предоставляет QueueBrowser объект, который приложение может использовать для просмотра сообщений в очереди и отображения значений заголовков для каждого сообщения.

Вы можете создать браузер очередей с помощью JMSContext, как показано в следующем примере:

QueueBrowser browser = context.createBrowser(queue);

Примечание.

API JMS не предоставляет API для просмотра раздела.

Это ограничение существует, так как сам раздел не сохраняет сообщения. Как только сообщение отправляется в раздел, оно перенаправляется в соответствующие подписки.

Селекторы сообщений JMS

Приложения-получатели могут использовать селекторы сообщений для фильтрации получаемых сообщений. Используя селекторы сообщений, принимающее приложение выгружает работу фильтрации сообщений поставщику JMS (в данном случае Служебная шина Azure) вместо того, чтобы взять на себя эту ответственность.

При создании любого из следующих потребителей можно использовать селекторы:

  • Общая устойчивая подписка
  • Несоотделенные устойчивые подписки
  • Общая непостоянная подписка
  • Неразделяемая кратковременная подписка
  • Потребитель очереди
  • Браузер очередей

Примечание.

служебная шина селекторы не поддерживают ключевые слова SQL LIKE и BETWEEN.

Запланированные сообщения (задержка доставки)

JMS 2.0 поддерживает планирование сообщения для будущей доставки с помощью метода setDeliveryDelay на MessageProducer или JMSProducer. При установке этого свойства служебная шина принимает сообщение, но только делает его видимым для потребителей после истечения периода задержки.

MessageProducer producer = session.createProducer(queue);

// Schedule a message for delivery 30 seconds from now
producer.setDeliveryDelay(30000);
producer.send(session.createTextMessage("Scheduled message"));

Полный рабочий пример см. в разделе QueueScheduledSend.java в репозитории azure-servicebus-jms-samples.

Выбор и отказоустойчивость фабрики подключений

При использовании ServiceBusJmsConnectionFactory в Spring Boot или других платформах, которые управляют подключениями JMS, выберите правильную оболочку фабрики соединений для отправителей и прослушивателей , чтобы обеспечить надежную операцию.

Role Фабрика подключений Почему
Отправители (JmsTemplate) CachingConnectionFactory Упаковка ServiceBusJmsConnectionFactory JmsTemplate создает и закрывает подключение для каждой отправки по умолчанию. CachingConnectionFactory поддерживает одно подключение AMQP и кэширует сеансы, избегая переподключений, которые под нагрузкой могут исчерпать ресурсы брокера.
Прослушиватели (@JmsListener, DefaultMessageListenerContainer) Необработанный ServiceBusJmsConnectionFactory (распакованный) Каждый контейнер прослушивателя получает собственное подключение AMQP с независимым жизненным циклом. Если подключение завершается сбоем (истечение срока действия токена, обновление шлюза, сбой в сети), затрагивается только этот слушатель, и Spring автоматически воссоздаёт подключение.

Чего следует избегать слушателям

Предупреждение

Никогда не используйте SingleConnectionFactory с контейнерами прослушивателя. Он заставляет всех прослушивателей совместно использовать одно подключение JMS. Если это подключение нарушается по какой-либо причине, все прослушиватели теряют подключение одновременно и не могут восстановиться независимо. Используйте необработанный ServiceBusJmsConnectionFactory контейнер, чтобы каждый контейнер прослушивателя управлял собственным подключением.

CachingConnectionFactory В контейнерах прослушивателей также могут возникнуть проблемы, так как кэшированные сессии могут ссылаться на устаревшее базовое подключение. Для прослушивателей сырая фабрика гарантирует, что каждый контейнер может создавать новое подключение независимо.

Параметры по умолчанию Spring Cloud Azure

Если вы используете spring-cloud-azure-starter-servicebus-jms (версия 6.2.0+), стартовый механизм применяет это разделение фабрики по умолчанию.

spring.jms.servicebus.pool.enabled spring.jms.cache.enabled Фабрика отправителей Фабрика прослушивателя
(не задано) (не задано) CachingConnectionFactory ServiceBusJmsConnectionFactory
(не задано) true CachingConnectionFactory CachingConnectionFactory
(не задано) false ServiceBusJmsConnectionFactory ServiceBusJmsConnectionFactory
true (не задано) JmsPoolConnectionFactory JmsPoolConnectionFactory

В более ранних версиях (до 6.2.0) и отправители, и прослушиватели по умолчанию используют ServiceBusJmsConnectionFactory, что приводит к созданию нового подключения для каждой отправки.

Добавление прослушивателя исключений

Без прослушивателя исключений разрыв соединения происходит незаметно. Добавьте jakarta.jms.ExceptionListener и в фабрику отправителя, и в фабрику слушателя для обеспечения наблюдаемости.

connection.setExceptionListener(exception -> {
    log.error("JMS connection error: {}", exception.getMessage(), exception);
});

В Spring Boot задайте прослушиватель исключений ( CachingConnectionFactory для отправителей) и DefaultJmsListenerContainerFactory (для прослушивателей).

Полный рабочий пример, показывающий все эти шаблоны, см. в примере Spring Boot JMS Resilience в репозитории azure-servicebus-jms-samples.

Очереди недоставленных сообщений

Каждая очередь и подписка на тему в Служебная шина Azure имеет связанную очередь отложенных сообщений dead letter queue (DLQ). Система автоматически перемещает в DLQ сообщения, которые она не может доставить или обработать. Например, система перемещает сообщение в DLQ, когда оно превышает максимальное количество доставок или истекает срок его жизни (TTL).

Это важно

Чтобы переместить сообщения с истекшим сроком действия TTL в DLQ, включите мертвый поток сообщений при истечении срока действия для очереди или подписки. Без этого параметра система автоматически удаляет просроченные сообщения. Инструкции по настройке см. в разделе "Включение недоставленных букв для очереди или подписки".

В JMS вы обращаетесь к DLQ в качестве отдельного назначения, создав полный путь и создав JmsQueue его. Специальный API не требуется.

Формат пути DLQ очереди:

<queue-name>/$deadletterqueue

Формат пути DLQ для подписки на топик:

<topic-name>/Subscriptions/<subscription-name>/$deadletterqueue

Пример: использование из очереди недоставленных писем:

import org.apache.qpid.jms.JmsQueue;

// Construct the DLQ path for a queue named "orders"
String dlqPath = "orders/$deadletterqueue";
JmsQueue dlqDestination = new JmsQueue(dlqPath);

// Create a consumer on the DLQ and receive messages
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
Message message = dlqConsumer.receive(5000);

Сообщения с недоставленными буквами включают свойства метаданных, описывающие, почему сообщение было недоставлено:

Недвижимость Description
DeadLetterReason Причина, по которой сообщение было недоставлено (например, TTLExpiredException или MaxDeliveryCountExceeded).
DeadLetterErrorDescription Читаемое человеком описание причины недоставленной буквы.

Прочитайте эти свойства с помощью message.getStringProperty():

String reason = message.getStringProperty("DeadLetterReason");
String description = message.getStringProperty("DeadLetterErrorDescription");

Полный рабочий пример см. в разделе QueueDeadLetterReceive.java в репозитории azure-servicebus-jms-samples.

Сопоставление управления AMQP и операций шины служб

Вот как распоряжение AMQP преобразуется в операцию служебная шина:

ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()

Сводка

В этом руководстве разработчика показано, как Java клиентские приложения, использующие службу сообщений Java (JMS), могут подключаться к Служебная шина Azure.

Дальнейшие действия

Дополнительные сведения об Служебная шина Azure и сущностях Java Message Service (JMS) см. в следующих источниках: