Поделиться через


Краткое руководство. Отправка событий в Центры событий или получение событий с помощью Go

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий, принимающая и обрабатывающая миллионы событий в секунду. Центры событий могут обрабатывать и хранить события, данные или данные телеметрии, созданные распределенным программным обеспечением и устройствами. Данные, отправленные в концентратор событий, можно преобразовать и сохранить с помощью любого поставщика аналитики в реальном времени или адаптеров пакетной обработки или хранилища. Подробный обзор Центров событий см. в статьях Что такое Центры событий Azure? и Обзор функций Центров событий.

В этом кратком руководстве описывается, как записывать приложения Go для отправки событий в концентратор событий или получения событий из концентратора событий.

Замечание

Это руководство для быстрого старта основано на примерах из GitHub https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Раздел отправки событий основан на примере example_producing_events_test.go, а раздел приема событий основан на примере example_processor_test.go. Код упрощен для быстрого старта, и все подробные комментарии были удалены, поэтому смотрите примеры для получения дополнительных сведений и объяснений.

Предпосылки

Для завершения этой быстрой установки вам потребуются следующие пререквизиты:

  • Go установлен локально. При необходимости следуйте этим инструкциям .
  • Активная учетная запись Azure. Если у вас нет подписки на Azure, создайте бесплатную учетную запись перед началом.
  • Создайте пространство имен Центров событий и концентратор событий. Используйте портал Azure для создания пространства имен центров событий и получения учетных данных управления, необходимых приложению для взаимодействия с концентратором событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.

Отправка событий

В этом разделе показано, как создать приложение Go для отправки событий в концентратор событий.

Установка пакета Go

Получите пакет Go для Центров событий, как показано в следующем примере.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Код для отправки событий в концентратор событий

Ниже приведен код для отправки событий в концентратор событий. Основными этапами кода являются:

  1. Создайте клиент производителя Центров событий с помощью строки подключения к пространству имен Центров событий и имени концентратора событий.
  2. Создайте пакетный объект и добавьте примеры событий в пакет.
  3. Отправьте пакет событий в события th.

Это важно

Замените NAMESPACE CONNECTION STRING строкой подключения к вашему пространству имен Event Hubs и EVENT HUB NAME именем концентратора событий в примере кода.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Еще не запускайте приложение. Сначала необходимо запустить приложение-получатель, а затем приложение отправителя.

Получение событий

Создание учетной записи хранения и контейнера

Такие состояния, как аренда разделов и контрольных точек в контексте событий, распределяются между получателями с помощью контейнера Azure Storage. Вы можете создать учетную запись хранения и контейнер с помощью пакета SDK Go, но вы также можете создать ее, следуя инструкциям в разделе "Сведения о учетных записях хранения Azure".

Следуйте этим рекомендациям при использовании хранилища BLOB-объектов Azure в качестве хранилища контрольных точек:

  • Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
  • Не используйте учетную запись хранения для других действий.
  • Не используйте контейнер для ничего другого.
  • Создайте учетную запись хранения в том же регионе, что и развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.

На странице учетной записи хранения в портале Azure в разделе службы BLOB убедитесь, что следующие параметры отключены.

  • Иерархическое пространство имен
  • Обратимое удаление BLOB-объекта
  • Управление версиями

Пакеты Go

Чтобы получить сообщения, получите пакеты Go для Центров событий, как показано в следующем примере.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Код для получения событий из концентратора событий

Ниже приведен код для получения событий из концентратора событий. Основными этапами кода являются:

  1. Убедитесь, что объект хранилища контрольной точки представляет собой хранилище BLOB-объектов Azure, используемое центром событий для ведения контрольных точек.
  2. Создайте клиент-получатель Event Hubs, используя строку подключения к пространству имен Event Hubs и название концентратора событий.
  3. Создайте обработчик событий с помощью клиентского объекта и объекта хранилища контрольных точек. Обработчик получает и обрабатывает события.
  4. Для каждого раздела в концентраторе событий создайте клиент раздела, используя processEvents в качестве функции обработки событий.
  5. Запустите все клиенты секционирования для получения и обработки событий.

Это важно

Замените следующие значения заполнителей фактическими значениями:

  • AZURE STORAGE CONNECTION STRING строка подключения для вашей учетной записи хранения Azure
  • BLOB CONTAINER NAME с именем контейнера BLOB, созданного в учетной записи хранения
  • NAMESPACE CONNECTION STRING с строкой подключения для пространства имен Центров событий
  • EVENT HUB NAME с именем концентратора событий в примере кода.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Запуск приложений получателя и отправителя

  1. Сначала запустите приложение-получатель.

  2. Запустите приложение отправителя.

  3. Подождите минуту, чтобы увидеть следующие выходные данные в окне приемника.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Дальнейшие шаги

См. примеры на сайте GitHub https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.