Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве вы узнаете, как отправлять сообщения и получать сообщения из очередей служебной шины Azure с помощью языка программирования Go.
Служебная шина Azure — это полностью управляемый корпоративный брокер сообщений с очередями сообщений и возможностями публикации и подписки. Служебная шина используется для развязки приложений и служб друг от друга, обеспечивая распределенный, надежный и высокопроизводительный транспорт сообщений.
Пакет Azure SDK для Go azservicebus позволяет отправлять и получать сообщения из служебной шины Azure и использовать язык программирования Go.
В конце этого руководства вы сможете: отправить одно сообщение или пакет сообщений в очередь, получать сообщения и сообщения, не обработанные.
Предпосылки
- Подписка Azure. Вы можете активировать преимущества подписчика Visual Studio или MSDN или зарегистрироваться для бесплатной учетной записи.
- Если у вас нет очереди для работы, выполните шаги в статье Использование портала Azure для создания очереди Service Bus для её создания.
- Go версии 1.18 или более поздней
Создание примера приложения
Чтобы начать, создайте новый модуль Go.
Создайте каталог для модуля с именем
service-bus-go-how-to-use-queues.В каталоге
azservicebusинициализируйте модуль и установите необходимые пакеты.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebusСоздайте файл с именем
main.go.
Проверка подлинности и создание клиента
main.go В файле создайте новую функцию с именем GetClient и добавьте следующий код:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Функция GetClient возвращает новый azservicebus.Client объект, созданный с помощью пространства имен служебной шины Azure и учетных данных. Пространство имен предоставляется переменной AZURE_SERVICEBUS_HOSTNAME среды. И учетные данные создаются с помощью azidentity.NewDefaultAzureCredential функции.
Для локальной разработки DefaultAzureCredential используется маркер доступа из Azure CLI, который можно создать, выполнив az login команду для проверки подлинности в Azure.
Подсказка
Для проверки подлинности с помощью строки подключения используйте функцию NewClientFromConnectionString .
Отправка сообщений в очередь
main.go В файле создайте новую функцию с именем SendMessage и добавьте следующий код:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage принимает два параметра: строку сообщения и azservicebus.Client объект. Затем он создает новый azservicebus.Sender объект и отправляет сообщение в очередь. Чтобы отправить массовые сообщения, добавьте функцию в SendMessageBatchmain.go файл.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch принимает два параметра: срез сообщений и azservicebus.Client объект. Затем он создает новый azservicebus.Sender объект и отправляет сообщения в очередь.
Получение сообщений из очереди
После отправки сообщений в очередь вы можете получать их, используя тип azservicebus.Receiver. Чтобы получать сообщения из очереди, добавьте функцию GetMessage в main.go файл.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage принимает azservicebus.Client объект и создает новый объект azservicebus.Receiver. Затем он получает сообщения из очереди. Функция Receiver.ReceiveMessages принимает два параметра: контекст и количество полученных сообщений. Функция Receiver.ReceiveMessages возвращает срез azservicebus.ReceivedMessage объектов.
Затем цикл for выполняет обход сообщений и выводит текст сообщения.
CompleteMessage Затем функция вызывается для завершения сообщения, удалив его из очереди.
Сообщения, превышающие ограничения на длину, отправленные в недопустимую очередь или не обработанные успешно, могут быть отправлены в очередь недоставленных сообщений. Чтобы отправить сообщения в очередь недоставленных писем, добавьте SendDeadLetterMessage функцию в main.go файл.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage принимает azservicebus.Client объект и azservicebus.ReceivedMessage объект. Затем он отправляет сообщение в очередь недоставленных писем. Функция принимает два параметра: контекст и azservicebus.DeadLetterOptions объект. Функция Receiver.DeadLetterMessage возвращает ошибку, если сообщение не отправляется в очередь недоставленных писем.
Чтобы получить сообщения из очереди недоставленных писем, добавьте функцию в ReceiveDeadLetterMessagemain.go файл.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage принимает объект azservicebus.Client и создает новый объект azservicebus.Receiver с параметрами очереди недоставленных писем. Затем он получает сообщения из очереди недоставленных писем. Затем функция получает одно сообщение из очереди недоставленных писем. Затем он выводит на печать причину недоставленного сообщения и описание этой проблемы.
Пример кода
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Запустите код
Перед запуском кода создайте переменную среды с именем AZURE_SERVICEBUS_HOSTNAME. Задайте для переменной среды значение пространства имен служебной шины.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Затем выполните следующую go run команду, чтобы запустить приложение:
go run main.go
Дальнейшие шаги
Дополнительные сведения см. по следующим ссылкам: