Поделиться через


Передача, блокировка и согласование сообщений

Основной возможностью брокера сообщений, как например Service Bus, является приём сообщений в очередь или тему и их хранение для последующего извлечения. Термин отправки используется для описания процесса передачи сообщения брокеру сообщений, а получение ссылается на процесс получения сообщения от брокера.

Когда клиент отправляет сообщение, обычно он хочет знать, правильно ли сообщение передается и принимается брокером или возникает ли какая-то ошибка. Это позитивное или негативное подтверждение устанавливает понимание клиента и брокера о состоянии передачи сообщения. поэтому оно называется согласованием.

Аналогичным образом, когда брокер передает сообщение клиенту, брокер и клиент хотят установить представление о том, успешно ли обработано сообщение и поэтому может быть удалено, или происходит сбой доставки или обработки сообщения, поэтому сообщение может быть доставлено снова.

Согласование операций отправки

При использовании любого поддерживаемого клиента API служебной шины операции отправки в служебную шину всегда согласовываются явным образом. Это значит, что операция API ожидает результата приема из служебной шины, и только после его получения выполняет операцию отправки.

Если служебная шина отклоняет сообщение, то данные отклонения содержат индикатор ошибки и текст с идентификатором трассировки (tracking-id). Отказ также включает информацию о том, можно ли повторно выполнить операцию с ожиданием успеха. В клиенте эти сведения преобразуются в исключение, передаваемое вызывающей стороне операции отправки. Если сообщение принято, операция автоматически завершается.

Протокол расширенной очереди сообщений (AMQP) — это единственный протокол, поддерживаемый для клиентов .NET Standard, Java, JavaScript, Python и Go. Для клиентов платформы .NET Framework вы можете использовать Протокол обмена сообщениями Service Bus (SBMP) или AMQP. При использовании протокола AMQP передачи сообщений и расчетов конвейерируются и асинхронны. Рекомендуется использовать API для асинхронной модели программирования.

30 сентября 2026 г. мы выведем из эксплуатации библиотеки SDK Azure Service Bus: WindowsAzure.ServiceBus, Microsoft.Azure.ServiceBus и com.microsoft.azure.servicebus, которые не соответствуют рекомендациям по Azure SDK. Мы также завершим поддержку протокола SBMP, поэтому вы больше не сможете использовать этот протокол после 30 сентября 2026 года. Перейдите в последние библиотеки пакета SDK Azure, которые предлагают критически важные обновления системы безопасности и улучшенные возможности до этой даты.

Хотя старые библиотеки по-прежнему могут использоваться после 30 сентября 2026 года, они больше не будут получать официальную поддержку и обновления от Майкрософт. Для получения дополнительной информации см. объявление об окончании поддержки.

Отправитель может быстро передать несколько сообщений по сети в течение короткого промежутка времени без ожидания подтверждения каждого сообщения, что имело бы место в случае с протоколом SBMP или HTTP 1.1. Эти асинхронные операции отправки считаются выполненными, когда соответствующие сообщения принимаются и сохраняются в секционированных сущностях или когда совмещаются операции отправки в разные сущности. Завершения также могут происходить не в первоначальном порядке отправки.

Стратегия обработки результата операций отправки может мгновенно и значительно влиять на производительность вашего приложения. Примеры в этом разделе написаны на C# и применяются к фьючерсам Java, Java Mono, обещаниям JavaScript и эквивалентным концепциям на других языках.

Если приложение создает всплески сообщений, как показано здесь на простом цикле, и должно ожидать завершения каждой операции отправки перед отправкой следующего сообщения, то и синхронный, и асинхронный API действуют одинаково, поскольку отправка 10 сообщений завершится только после 10 последовательных полных циклов для подтверждения.

При предполагаемом расстоянии от 70-миллисекундного протокола управления передачей (TCP) от локального сайта до служебная шина и предоставления всего 10 мс для служебная шина принимать и хранить каждое сообщение, следующий цикл занимает не менее 8 секунд, не подсчитывая время передачи полезных данных или потенциальные эффекты перегрузки маршрута:

for (int i = 0; i < 10; i++)
{
    // creating the message omitted for brevity
    await sender.SendMessageAsync(message);
}

Если приложение подряд запускает 10 асинхронных операций отправки и ожидает завершения каждой из них отдельно, то время кругового пути для этих 10 операций пересекается. 10 сообщений передаются подряд, потенциально даже в общих кадрах TCP, и общее время передачи во многом зависит от времени, необходимого сети для передачи этих сообщений в брокер.

При действии тех же предположений, что и для предыдущего цикла, общее совмещенное время выполнения приведенного ниже цикла может составить меньше секунды:

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
    tasks.Add(sender.SendMessageAsync(message));
}
await Task.WhenAll(tasks);

Важно отметить, что все асинхронные модели программирования используют разновидности скрытых рабочих очередей в памяти, в которых хранятся ожидающие операции. Когда API для отправки возвращает подтверждение, задача отправки помещается в рабочую очередь, но выполнение протокольной операции начинается только когда очередь доходит до неё. Для кода, который, как правило, передает пакеты сообщений и где надежность является проблемой, следует принять во внимание, что не слишком много сообщений находятся в процессе передачи одновременно, так как все отправленные сообщения занимают память, пока они не будут помещены в сеть.

Семафоры, как показано в следующем фрагменте кода на C#, — это объекты синхронизации, при необходимости обеспечивающие подобное регулирование на уровне приложения. Такое использование семафора гарантирует, что в состоянии "на лету" одновременно могут находиться не более 10 сообщений. Один из 10 доступных семафоров блокируется перед отправкой, и блокировка снимается только после завершения отправки. 11-й проход по циклу ожидает завершения по крайней мере одной из предыдущих операций отправки, а затем делает блокировку доступной:

var semaphore = new SemaphoreSlim(10);

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
    await semaphore.WaitAsync();

    tasks.Add(sender.SendMessageAsync(message).ContinueWith((t)=>semaphore.Release()));
}
await Task.WhenAll(tasks);

Приложения никогда не должны инициировать операцию асинхронной передачи по принципу "отправить и забыть", то есть без получения результата операции. В противном случае это может привести к тому, что внутренняя невидимая очередь задач займет всю память, и приложение не сможет обнаруживать ошибки отправки.

for (int i = 0; i < 10; i++)
{
    sender.SendMessageAsync(message); // DON’T DO THIS
}

С низкоуровневым клиентом AMQP служебная шина также принимает "предустановленные" передачи. Предварительно урегулированная передача — это операция, выполнение которой не требует контроля, и результат которой не сообщается клиенту, а сообщение считается завершённым при отправке. Отсутствие обратной связи для клиента также означает отсутствие действенных данных для диагностики. Поэтому этот режим не соответствует требованиям для получения помощи от службы поддержки Azure.

Согласование операций получения

Для операций получения клиенты API служебной шины используют два разных явных режима: ReceiveAndDelete и PeekLock.

Получить и удалить

В режиме ReceiveAndDelete брокер считает, что все сообщения, отправляемые им в получающий клиент, являются согласованными после отправки. Это означает, что сообщение считается потребляемым, как только брокер помещает его в провод. Если происходит сбой передачи сообщения, оно утрачивается.

Преимуществом этого режима является то, что получателю не требуется предпринимать дальнейшие действия с сообщением и его работа не замедляется из-за ожидания результата согласования. Если данные, содержащиеся в отдельных сообщениях, имеют низкую ценность и (или) важны только в течение очень короткого времени, этот режим является неплохим выбором.

Блокировка для просмотра

В режиме PeekLock брокеру указывается, что принимающему клиенту необходимо явно согласовывать полученные сообщения. Сообщение становится доступным для обработкой получателем. На этот период на сообщение накладывается монопольная блокировка в службе, чтобы его не могли просмотреть другие конкурирующие получатели. Длительность блокировки изначально определяется на уровне очереди или подписки и может быть расширена клиентом, владельцем блокировки, с помощью операции RenewMessageLockAsync . Подробнее об продлении блокировок см. в разделе Продление блокировок этой статьи.

Когда сообщение заблокировано, другие клиенты, получающие сообщения из той же очереди или подписки, могут накладывать свои блокировки и получать следующие доступные сообщения, которые не заблокированы. Когда блокировка сообщения явно освобождается или когда срок действия блокировки истекает, сообщение помещается в передней части порядка извлечения для повторного получения.

Если сообщение несколько раз освобождается получателями или они позволяют истечь сроку его блокировки заданное число раз (maxDeliveryCount), то сообщение автоматически удаляется из очереди или подписки и помещается в связанную очередь недоставленных сообщений.

Принимающий клиент инициирует завершение обработки полученного сообщения с положительным подтверждением при вызове API Complete для сообщения. Это указывает брокеру, что сообщение успешно обработано, и оно удаляется из очереди или подписки. Брокер отвечает на намерение согласования получателя, сообщая, возможно ли выполнить это согласование.

Если получающий клиент не может обработать сообщение, но хочет, чтобы сообщение было повторно доставлено, он может явно попросить освободить и разблокировать сообщение мгновенно, вызвав API "Отказ" для сообщения, или он может ничего не делать и позволить блокировке истечь.

Если получающий клиент не сможет обработать сообщение и знает, что повторная доставка сообщения и повторная попытка операции не помогут, он может отклонить сообщение, перемещая его в очередь недоставленных сообщений, вызвав API DeadLetter для сообщения. Это также позволяет задать пользовательское свойство, включая код причины, который можно получить вместе с сообщением из очереди недоставленных сообщений.

Примечание.

Подочередь для сообщений с ошибками существует для очереди или подписки на тему только в том случае, если для очереди или подписки включена функция обработки сообщений с ошибками.

Особым случаем расчёта является откладывание. Подробнее см. в разделе Откладывание сообщения.

Операции Complete, DeadLetter, или RenewLock могут потерпеть неудачу из-за проблем с сетью, если срок действия удерживаемой блокировки истек, или имеются другие условия на стороне сервиса, которые препятствуют урегулированию. В одном из последних случаев служба отправляет отрицательное подтверждение, которая отображается в клиентах API как исключение. Если причиной является нарушение сетевого подключения, то блокировка снимается, так как служебная шина не поддерживает восстановление существующих ссылок AMQP через другое подключение.

Если Complete выходит из строя, обычно в самом конце обработки сообщений и в некоторых случаях после нескольких минут обработки, принимающее приложение может решить, следует ли сохранить состояние работы и игнорировать то же сообщение при его повторной доставке, либо отказаться от результата работы и повторить попытку при повторной доставке сообщения.

Типичным механизмом идентификации повторной доставки сообщений является проверка message-id, который может и должен быть задан отправителем как уникальное значение, возможно, совпадающее с идентификатором исходного процесса. Планировщик заданий, скорее всего, установит message-id как идентификатор задания, которое он пытается назначить конкретному работнику, и рабочий будет игнорировать вторую попытку назначения задания, если это задание уже выполнено.

Внимание

Важно отметить, что блокировка сообщения, которую устанавливает PeekLock или SessionLock, является временной и может быть потеряна при следующих условиях.

  • Обновление службы
  • Обновление ОС
  • Изменение свойств сущности (очереди, темы, подписки) во время удержания блокировки.
  • Если клиентское приложение шины обслуживания теряет подключение к шине обслуживания по какой-либо причине.

При потере блокировки служебная шина Azure создает сообщение MessageLockLostException или SessionLockLostException, которое отображается в клиентском приложении. В этом случае на клиенте должна автоматически запуститься логика повтора по умолчанию и повторить операцию. Кроме того, количество доставки сообщения не увеличивается.

Продление блокировок

Значение по умолчанию для длительности блокировки составляет 1 минуту. Можно указать другое значение для длительности блокировки на уровне очереди или подписки . Клиент, владеющий блокировкой, может продлить блокировку сообщения с помощью методов объекта-получателя. Вместо этого можно использовать функцию автоматического продления блокировки, в которой можно указать период действия продления блокировки.

Рекомендуется задать длительность блокировки более высокой, чем обычное время обработки, поэтому вам не нужно продлевать блокировку. Максимальное значение составляет 5 минут, поэтому, если вы хотите иметь блокировку дольше, её необходимо обновить. Наличие более длительной блокировки, чем необходимо, также имеет некоторые последствия. Например, когда клиент перестанет работать, сообщение станет доступным только после прохождения длительности блокировки.

Следующие шаги