Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве показано, как использовать топики и подписки Service Bus в сценарии управления розничным товарным запасом. Он включает каналы публикации и подписки с помощью портала Azure и .NET. Такой сценарий, например, отлично подходит для обновления сведений об ассортименте товаров в нескольких магазинах розничной торговли. В этом сценарии каждый магазин или группа магазинов получают сообщения, позволяющие им обновить ассортимент.
Служебная шина Azure — это служба обмена сообщениями с несколькими клиентами, которая отправляет информацию между приложениями и службами. Асинхронные операции обеспечивают гибкий обмен сообщениями с участием брокера, структурированную отправку сообщений в порядке очереди (FIFO), а также возможности публикации и подписки. Общие сведения см. в разделе "Что такое служебная шина"?
В этом руководстве показано, как реализовать этот сценарий с помощью подписок и фильтров. Сначала вы создадите раздел с тремя подписками, добавите в него правила и фильтры, а затем отправите и получите сообщения через разделы и подписки.
В этом руководстве описано следующее:
- создание раздела служебной шины и трех подписок на этот раздел с помощью портала Azure;
- добавление фильтров для подписок с помощью кода .NET;
- создание сообщений с разным содержимым;
- отправка сообщений и проверка их поступления в ожидаемые подписки;
- получение сообщений из подписок.
Предварительные условия
Чтобы пройти это руководство, убедитесь, что у вас есть:
- Подписка Azure. Чтобы использовать службы Azure, включая Служебную шину Azure, вам потребуется подписка. Перед началом работы можно создать бесплатную учетную запись .
- Visual Studio 2019 или более поздней версии.
Топики и подписки Service Bus
Каждая подписка на раздел может получать копию любого сообщения. Разделы полностью совместимы с очередями служебной шины на уровнях протоколов и семантики. Темы служебной шины поддерживают широкий набор правил отбора с условиями фильтрации и необязательными действиями, которые присваивают или изменяют свойства сообщения. Каждый раз, когда срабатывает правило, генерируется сообщение. Дополнительные сведения о правилах, фильтрах и действиях см. в разделе " Фильтры и действия раздела".
Создание пространства имен на портале Azure
Чтобы начать использование сущностей обмена сообщениями служебной шины в Azure, создайте пространство имен с уникальным именем в Azure. Пространство имен предоставляет контейнер области видимости для ресурсов шины обслуживания, таких как очереди и топики, в вашем приложении.
Создать пространство имен:
Войдите на портал Azure.
Выберите всплывающее меню в левом верхнем углу и перейдите на страницу "Все службы".
На панели навигации слева выберите "Интеграция".
Прокрутите вниз до служб обмена сообщениями, наведите указатель мыши на служебную шину и нажмите кнопку "Создать".
На вкладке "Основы" страницы"Создание пространства имен " выполните следующие действия:
Выберите подписку Azure, в которой будет создано пространство имен.
Для группы ресурсов выберите существующую группу ресурсов или создайте новую.
Имя пространства имен должно соответствовать следующим правилам именования:
- Это имя должно быть уникальным в пределах Azure. Система немедленно проверяет, доступно ли имя.
- Длина имени составляет не менее 6 и не более 50 символов.
- Имя может содержать только буквы, цифры и дефисы
-. - Имя должно начинаться с буквы и заканчиваться буквой или цифрой.
- Имя не заканчивается на
-sbили-mgmt.
Для расположения выберите регион для размещения пространства имен.
Для параметра Ценовая категория выберите ценовую категорию ("Базовый", "Стандартный" или "Премиум") для пространства имен. Для этого быстрого начала выберите Стандартный.
Если выбрать уровень "Премиум" , можно включить георепликацию для пространства имен. Функция георепликации гарантирует, что метаданные и данные пространства имен постоянно реплицируются из основного региона в один или несколько дополнительных регионов.
Внимание
Чтобы использовать разделы и подписки, выберите категорию "Стандартный" или "Премиум". Ценовая категория "Базовый" не поддерживает разделы и подписки.
Если выбрана ценовая категория Премиум, укажите число единиц обмена сообщениями. В категории "Премиум" обеспечивается изоляция ресурсов на уровне ЦП и памяти, так что рабочая нагрузка выполняется изолированно от других. Этот контейнер ресурсов называется как единица обмена сообщениями. Пространству имен ценовой категории "Премиум" выделяется по крайней мере одна единица обмена сообщениями. Вы можете выбрать 1, 2, 4, 8 или 16 единиц обмена сообщениями для каждого пространства имен служебной шины Premium. Для получения дополнительной информации см. премиальный уровень обмена сообщениями службы Service Bus.
Нажмите Review + create внизу страницы.
На странице "Просмотр и создание " просмотрите параметры и нажмите кнопку "Создать".
После успешного развертывания ресурса выберите "Перейти к ресурсу " на странице развертывания.
Вы увидите главную страницу пространства имен сервисной шины.
Получение строки подключения для пространства имен (портал Azure)
Создание нового пространства имен автоматически создает начальную политику общей подписи доступа (SAS). Политика включает первичные и вторичные ключи, а также строки первичного и вторичного подключения, которые предоставляют полный контроль над всеми аспектами пространства имен. Дополнительные сведения о создании правил с более ограниченными правами для обычных отправителей и получателей см. в разделе Аутентификация и авторизация для Service Bus.
Клиент может использовать строку подключения для подключения к пространству имен служебной шины. Чтобы скопировать первичную строку подключения для пространства имен, выполните следующие действия:
На странице пространства имен Service Bus разверните Параметры, а затем выберите Политики общего доступа.
На странице Политики общего доступа щелкните RootManageSharedAccessKey.
В окне политики SAS: RootManageSharedAccessKey нажмите кнопку копирования рядом с основной строкой подключения. Данное действие копирует строку подключения в буфер обмена для дальнейшего использования. Вставьте это значение в блокнот или любое другое временное место.
Эту страницу можно использовать для копирования первичного и вторичного ключей, а также первичной и вторичной строки подключения.
Создание раздела с помощью портала Azure
На странице пространства имен Service Bus разверните объекты в меню навигации слева и выберите темы.
Выберите + Раздел.
Введите имя для темы. Для других параметров оставьте значения по умолчанию.
Нажмите кнопку создания.
Создание подписок на раздел
Выберите раздел, который был создан в предыдущем разделе.
На странице Тема Служебной Шины выберите + Подписка.
На странице Создать подписку выполните следующие действия:
Повторите предыдущий шаг дважды, создав подписки с именами S2 и S3.
Создание правил фильтрации по подпискам
После создания пространства имен, тем и подписок и получения строки подключения к пространству имен, вы можете создать фильтрующие правила для подписок. После этого отправьте и получите сообщения. Этот код можно изучить в папке с примером на GitHub.
Отправка и получение сообщений
Чтобы выполнить код:
В окне командной строки или в командной строке PowerShell клонируйте репозиторий служебной шины GitHub:
git clone https://github.com/Azure/azure-service-bus.gitПерейдите к папке
azure-service-bus\samples\DotNet\Azure.Messaging.ServiceBus\BasicSendReceiveTutorialWithFiltersс примерами.Восстановите строку подключения, которую вы скопировали в Блокнот ранее в этом руководстве. Вам также потребуется имя созданного раздела.
В командной строке введите следующую команду:
dotnet buildПерейдите в папку
BasicSendReceiveTutorialWithFilters\bin\Debug\netcoreapp3.1.Чтобы запустить программу, введите следующую команду. Обязательно замените
myConnectionStringзначением иmyTopicNameименем созданного раздела:dotnet --roll-forward Major BasicSendReceiveTutorialWithFilters.dll -ConnectionString "myConnectionString" -TopicName "myTopicName"Следуйте инструкциям в консоли, чтобы выбрать процедуру создания фильтра. При создании фильтров необходимо также удалить стандартные фильтры. При использовании PowerShell или CLI не нужно удалять фильтр по умолчанию. Если вы используете код для создания фильтров, сначала удалите фильтр по умолчанию. Команды консоли 1 и 3 позволяют управлять фильтрами для ранее созданных подписок:
- Выполните команду 1, чтобы удалить стандартные фильтры.
- Выполните команду 2, чтобы добавить собственные фильтры.
- Выполните 3. Пропустите этот шаг в учебнике. При необходимости этот параметр удаляет собственные фильтры. Он не воссоздает фильтры по умолчанию.
Завершив создание фильтров, вы можете отправлять сообщения. Нажмите клавишу 4 и наблюдайте, как в раздел отправляются 10 сообщений:
Нажмите клавишу 5 и наблюдайте за получением этих сообщений. Если вы не получили обратно 10 сообщений, нажмите клавишу M для отображения меню, затем снова нажмите 5.
Очистка ресурсов
Выполните следующие действия, чтобы очистить ресурсы, которые больше не нужны.
- Перейдите к пространству имен на портале Azure.
- На странице пространство имен служебной шины выберите Удалить на панели команд, чтобы удалить пространство имен и ресурсы (очереди, темы и подписки) в ней.
Разбор примера кода
Этот раздел содержит дополнительные сведения о работе этого примера кода.
Получение строки подключения и темы
Прежде всего этот код объявляет набор переменных, которые управляют выполнением остальных частей программы.
string ServiceBusConnectionString;
string TopicName;
static string[] Subscriptions = { "S1", "S2", "S3" };
static IDictionary<string, string[]> SubscriptionFilters = new Dictionary<string, string[]> {
{ "S1", new[] { "StoreId IN('Store1', 'Store2', 'Store3')", "StoreId = 'Store4'"} },
{ "S2", new[] { "sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'" } },
{ "S3", new[] { "sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')" } }
};
// You can have only have one action per rule and this sample code supports only one action for the first filter, which is used to create the first rule.
static IDictionary<string, string> SubscriptionAction = new Dictionary<string, string> {
{ "S1", "" },
{ "S2", "" },
{ "S3", "SET sys.Label = 'SalesEvent'" }
};
static string[] Store = { "Store1", "Store2", "Store3", "Store4", "Store5", "Store6", "Store7", "Store8", "Store9", "Store10" };
static string SysField = "sys.To";
static string CustomField = "StoreId";
static int NrOfMessagesPerStore = 1; // Send at least 1.
Строка подключения и имя раздела передаются с помощью параметров командной строки, как показано ниже. Они считываются в методе Main() :
static void Main(string[] args)
{
string ServiceBusConnectionString = "";
string TopicName = "";
for (int i = 0; i < args.Length; i++)
{
if (args[i] == "-ConnectionString")
{
Console.WriteLine($"ConnectionString: {args[i + 1]}");
ServiceBusConnectionString = args[i + 1]; // Alternatively enter your connection string here.
}
else if (args[i] == "-TopicName")
{
Console.WriteLine($"TopicName: {args[i + 1]}");
TopicName = args[i + 1]; // Alternatively enter your queue name here.
}
}
if (ServiceBusConnectionString != "" && TopicName != "")
{
Program P = StartProgram(ServiceBusConnectionString, TopicName);
P.PresentMenu().GetAwaiter().GetResult();
}
else
{
Console.WriteLine("Specify -Connectionstring and -TopicName to execute the example.");
Console.ReadKey();
}
}
Удаление стандартных фильтров
При создании подписки в Service Bus создается фильтр по умолчанию для каждой подписки. Этот фильтр позволяет получать каждое сообщение, отправленное в раздел. Если вы намерены использовать пользовательские фильтры, этот стандартный фильтр можно удалить, как показано в следующем коде:
private async Task RemoveDefaultFilters()
{
Console.WriteLine($"Starting to remove default filters.");
try
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
foreach (var subscription in Subscriptions)
{
await client.DeleteRuleAsync(TopicName, subscription, CreateRuleOptions.DefaultRuleName);
Console.WriteLine($"Default filter for {subscription} has been removed.");
}
Console.WriteLine("All default Rules have been removed.\n");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await PresentMenu();
}
Создание фильтров
В следующем коде добавляются пользовательские фильтры, которые описаны в этом руководстве:
private async Task CreateCustomFilters()
{
try
{
for (int i = 0; i < Subscriptions.Length; i++)
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
string[] filters = SubscriptionFilters[Subscriptions[i]];
if (filters[0] != "")
{
int count = 0;
foreach (var myFilter in filters)
{
count++;
string action = SubscriptionAction[Subscriptions[i]];
if (action != "")
{
await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
{
Filter = new SqlRuleFilter(myFilter),
Action = new SqlRuleAction(action),
Name = $"MyRule{count}"
});
}
else
{
await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
{
Filter = new SqlRuleFilter(myFilter),
Name = $"MyRule{count}"
});
}
}
}
Console.WriteLine($"Filters and actions for {Subscriptions[i]} have been created.");
}
Console.WriteLine("All filters and actions have been created.\n");
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
await PresentMenu();
}
Удаление созданных пользовательских фильтров
Если нужно удалить из подписки все фильтры, воспользуйтесь следующим примером кода:
private async Task CleanUpCustomFilters()
{
foreach (var subscription in Subscriptions)
{
try
{
var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
IAsyncEnumerator<RuleProperties> rules = client.GetRulesAsync(TopicName, subscription).GetAsyncEnumerator();
while (await rules.MoveNextAsync())
{
await client.DeleteRuleAsync(TopicName, subscription, rules.Current.Name);
Console.WriteLine($"Rule {rules.Current.Name} has been removed.");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
Console.WriteLine("All default filters have been removed.\n");
await PresentMenu();
}
Отправка сообщений
Отправка сообщений в раздел происходит так же, как и отправка сообщений в очередь. В этом примере показано, как отправлять сообщения, используя список задач и асинхронную обработку:
public async Task SendMessages()
{
try
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var taskList = new List<Task>();
for (int i = 0; i < Store.Length; i++)
{
taskList.Add(SendItems(client, Store[i]));
}
await Task.WhenAll(taskList);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
Console.WriteLine("\nAll messages sent.\n");
}
private async Task SendItems(ServiceBusClient client, string store)
{
// create the sender
ServiceBusSender tc = client.CreateSender(TopicName);
for (int i = 0; i < NrOfMessagesPerStore; i++)
{
Random r = new Random();
Item item = new Item(r.Next(5), r.Next(5), r.Next(5));
// Note the extension class which is serializing an deserializing messages
ServiceBusMessage message = item.AsMessage();
message.To = store;
message.ApplicationProperties.Add("StoreId", store);
message.ApplicationProperties.Add("Price", item.GetPrice().ToString());
message.ApplicationProperties.Add("Color", item.GetColor());
message.ApplicationProperties.Add("Category", item.GetItemCategory());
await tc.SendMessageAsync(message);
Console.WriteLine($"Sent item to Store {store}. Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}"); ;
}
}
Получение сообщений
Сообщения снова получаются с помощью списка задач, а код использует пакетную обработку. Пакетную обработку можно применять как для отправки, так и для получения. Но в нашем примере реализовано только пакетное получение. В реальной ситуации вы не стали бы прерывать цикл, а продолжали бы его, задав больший интервал времени, например одну минуту. Вызов на получение от брокера остается открытым в течение этого времени. При поступлении сообщений они отправляются обратно немедленно. Выдается новый запрос на прием вызова.
Такой подход называется методом продолжительного опроса. Использование приёмного насоса, который можно увидеть в нескольких примерах в репозитории, является более типичным вариантом. Для получения дополнительной информации смотрите руководство Используйте портал Azure для создания пространства имен служебной шины и очереди.
public async Task Receive()
{
var taskList = new List<Task>();
for (var i = 0; i < Subscriptions.Length; i++)
{
taskList.Add(this.ReceiveMessages(Subscriptions[i]));
}
await Task.WhenAll(taskList);
}
private async Task ReceiveMessages(string subscription)
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
ServiceBusReceiver receiver = client.CreateReceiver(TopicName, subscription);
// In reality you would not break out of the loop like in this example but would keep looping. The receiver keeps the connection open
// to the broker for the specified amount of seconds and the broker returns messages as soon as they arrive. The client then initiates
// a new connection. So in reality you would not want to break out of the loop.
// Also note that the code shows how to batch receive, which you would do for performance reasons. For convenience you can also always
// use the regular receive pump which we show in our Quick Start and in other GitHub samples.
while (true)
{
try
{
//IList<Message> messages = await receiver.ReceiveAsync(10, TimeSpan.FromSeconds(2));
// Note the extension class which is serializing an deserializing messages and testing messages is null or 0.
// If you think you did not receive all messages, just press M and receive again via the menu.
IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);
if (messages.Any())
{
foreach (ServiceBusReceivedMessage message in messages)
{
lock (Console.Out)
{
Item item = message.As<Item>();
IReadOnlyDictionary<string, object> myApplicationProperties = message.ApplicationProperties;
Console.WriteLine($"StoreId={myApplicationProperties["StoreId"]}");
if (message.Subject != null)
{
Console.WriteLine($"Subject={message.Subject}");
}
Console.WriteLine(
$"Item data: Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}");
}
await receiver.CompleteMessageAsync(message);
}
}
else
{
break;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
Примечание.
Вы можете управлять ресурсами служебной шины с помощью обозревателя служебной шины. Обозреватель служебной шины позволяет без труда подключаться к пространству имен служебной шины и управлять сущностями обмена сообщениями. Это средство предоставляет расширенные функции, такие как функции импорта и экспорта или возможность тестирования раздела, очередей, подписок, служб ретрансляции, центров уведомлений и центров событий.
Связанный контент
При работе с этим руководством вы подготовили ресурсы с помощью портала Azure, а затем отправили и получили сообщения через раздел Служебной шины и подписки на него. Вы научились выполнять следующие задачи:
- создание раздела служебной шины и одной или нескольких подписок на этот раздел с помощью портала Azure;
- Добавьте фильтры темы с использованием кода .NET
- создание двух сообщений с разным содержимым;
- отправка сообщений и проверка их поступления в ожидаемые подписки;
- прием сообщений от подписок.
Дополнительные примеры отправки и получения сообщений см. в примерах служебной шины на GitHub.
Следующее руководство содержит дополнительные сведения об использовании возможностей публикации и подписки в cлужебной шине Azure.