Поделиться через


Шаблон папки "Исходящие транзакции" с помощью Azure Cosmos DB

Azure Cosmos DB
Служебная шина Azure
Функции Azure

Реализация надежного обмена сообщениями в распределенных системах может быть сложной задачей. В этой статье описывается, как использовать шаблон папки "Исходящие транзакции" для надежного обмена сообщениями и гарантированной доставки событий, важной частью поддержки идемпотентной обработки сообщений. Для этого вы будете использовать пакеты транзакций Azure Cosmos DB и канал изменений в сочетании с служебной шиной Azure.

Обзор

Архитектуры микрослужб получают популярность и показывают обещание в решении таких проблем, как масштабируемость, удобство обслуживания и гибкость, особенно в крупных приложениях. Но этот архитектурный шаблон также представляет проблемы, когда речь идет об обработке данных. В распределенных приложениях каждая служба независимо сохраняет данные, необходимые для работы в выделенном хранилище данных, принадлежащей службе. Для поддержки такого сценария обычно используется решение для обмена сообщениями, например RabbitMQ, Kafka или служебная шина Azure, которая распределяет данные (события) из одной службы через шину обмена сообщениями в другие службы приложения. Внутренние или внешние потребители затем могут подписаться на эти сообщения и получать уведомления об изменениях сразу после обработки данных.

Известный пример в этой области является системой упорядочивания: когда пользователь хочет создать заказ, Ordering служба получает данные от клиентского приложения через конечную точку REST. Он сопоставляет полезные данные с внутренним представлением Order объекта для проверки данных. После успешной фиксации в базе данных она публикует OrderCreated событие в шине сообщений. Любая другая служба, заинтересованная в новых заказах (например InventoryInvoicing , служба), подписывается на OrderCreated сообщения, обрабатывает их и хранит их в собственной базе данных.

Следующий псевдокод показывает, как этот процесс обычно выглядит с Ordering точки зрения службы:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Этот подход работает хорошо, пока не возникнет ошибка между сохранением объекта заказа и публикацией соответствующего события. Отправка события может завершиться ошибкой по многим причинам:

  • Ошибки сети.
  • Сбой службы сообщений
  • Сбой узла

Независимо от ошибки, результатом является то, что OrderCreated событие не может быть опубликовано в шине сообщений. Другие службы не будут получать уведомления о создании заказа. Теперь Ordering служба должна заботиться о различных вещах, которые не связаны с фактическим бизнес-процессом. Он должен отслеживать события, которые по-прежнему должны быть помещены в шину сообщений, как только он снова в сети. Даже худший случай может произойти: несоответствия данных в приложении из-за потерянных событий.

Схема, показывющая обработку событий без шаблона

Решение

Существует хорошо известный шаблон с именем "Исходящий ящик транзакций ", который поможет вам избежать этих ситуаций. Это гарантирует, что события сохраняются в хранилище данных (обычно в таблице outbox в базе данных), прежде чем они в конечном итоге отправляются в брокер сообщений. Если бизнес-объект и соответствующие события сохраняются в одной транзакции базы данных, то гарантируется, что данные не будут потеряны. Все будет зафиксировано, или все откатится, если возникла ошибка. Чтобы в конечном итоге опубликовать событие, другая служба или рабочий процесс запрашивает таблицу "Исходящие" для необработанных записей, публикует события и помечает их как обработанные. Этот шаблон гарантирует, что события не будут потеряны после создания или изменения бизнес-объекта.

Схема, показывающая обработку событий с помощью шаблона

Скачайте файл Visio для этой архитектуры.

В реляционной базе данных реализация шаблона проста. Если служба использует Entity Framework Core, например, она будет использовать контекст Entity Framework для создания транзакции базы данных, сохранения бизнес-объекта и события и фиксации транзакции или отката. Кроме того, рабочая служба, обрабатывающая события, легко реализуется: периодически запрашивает таблицу "Исходящие" для новых записей, публикует недавно вставленные события в шину сообщений и, наконец, помечает эти записи как обработанные.

На практике вещи не так просто, как они могут сначала взглянуть. Самое главное, необходимо убедиться, что порядок событий сохраняется, чтобы OrderUpdated событие не было опубликовано до OrderCreated события.

Реализация в Azure Cosmos DB

В этом разделе показано, как реализовать шаблон исходящих транзакций в Azure Cosmos DB для обеспечения надежного обмена сообщениями между различными службами с помощью канала изменений Azure Cosmos DB и служебной шины. В нем демонстрируется пример службы, которая управляет объектами Contact (FirstName, , LastNameEmailCompanyсведениями и т. д.). Он использует шаблон сегрегации ответственности команд и запросов (CQRS) и соответствует основным понятиям проектирования на основе домена (DDD). Пример кода для реализации можно найти на сайте GitHub.

Объект Contact в примере службы имеет следующую структуру:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "[email protected]",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Как только Contact создается или обновляется, он выдает события, содержащие сведения о текущем изменении. Среди прочего, события домена могут быть следующими:

  • ContactCreated. Вызывается при добавлении контакта.
  • ContactNameUpdated. Вызывается при FirstName изменении или LastName изменении.
  • ContactEmailUpdated. Вызывается при обновлении адреса электронной почты.
  • ContactCompanyUpdated. Вызывается при изменении любого из свойств компании.

Пакеты транзакций

Чтобы реализовать этот шаблон, необходимо убедиться Contact , что бизнес-объект и соответствующие события будут сохранены в той же транзакции базы данных. В Azure Cosmos DB транзакции работают не так, как они выполняются в реляционных системах баз данных. Транзакции Azure Cosmos DB, называемые пакетами транзакций, работают с одной логической секцией, поэтому они гарантируют атомарность, согласованность, изоляцию и устойчивость (ACID). Невозможно сохранить два документа в транзакционной пакетной операции в разных контейнерах или логических секциях. Для примера службы это означает, что бизнес-объект и событие или события будут помещены в один контейнер и логический раздел.

Контекст, репозитории и UnitOfWork

Основная часть примера реализации — это контекст контейнера , который отслеживает объекты, сохраненные в одном пакете транзакций. Он поддерживает список созданных и измененных объектов и работает в одном контейнере Azure Cosmos DB. Интерфейс для него выглядит следующим образом:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Список в компоненте контекста контейнера отслеживает Contact и DomainEvent объекты. Оба будут помещены в один контейнер. Это означает, что несколько типов объектов хранятся в одном контейнере Azure Cosmos DB и используют Type свойство для различения бизнес-объекта и события.

Для каждого типа существует выделенный репозиторий, определяющий и реализующий доступ к данным. Интерфейс Contact репозитория предоставляет следующие методы:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Репозиторий Event выглядит аналогично, за исключением одного метода, который создает новые события в магазине:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Реализации обоих интерфейсов репозитория получают ссылку через внедрение зависимостей в один экземпляр, чтобы обеспечить работу обоих в одном IContainerContext контексте Azure Cosmos DB.

Последний компонент — это фиксация изменений, которые хранятся UnitOfWorkв экземпляре IContainerContext в Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Обработка событий: создание и публикация

Contact При каждом создании, изменении или удалении объекта (обратимо) служба вызывает соответствующее событие. Основной частью предоставленного решения является сочетание дизайна на основе домена (DDD) и шаблона посредника, предлагаемого Джимми Богардом. Он предлагает сохранить список событий, которые произошли из-за изменений объекта домена и публикации этих событий перед сохранением фактического объекта в базе данных.

Список изменений хранится в самом объекте домена, чтобы ни один другой компонент не смог изменить цепочку событий. Поведение обслуживания событий (IEvent экземпляров) в объекте домена определяется через интерфейс IEventEmitter<IEvent> и реализуется в абстрактном DomainEntity классе:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Объект Contact вызывает события домена. Сущность Contact следует основным понятиям DDD, настраивая методы задания свойств домена как частные. В классе нет открытых наборов. Вместо этого он предлагает методы для управления внутренним состоянием. В этих методах можно вызвать соответствующие события для определенного изменения (например ContactNameUpdated , или ContactEmailUpdated) .

Ниже приведен пример обновления имени контакта. (Событие вызывается в конце метода.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Соответствующий ContactNameUpdatedEvent, который отслеживает изменения, выглядит следующим образом:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

До сих пор события только что регистрируются в объекте домена, и ничего не сохраняется в базе данных или даже публикуется в брокере сообщений. После рекомендации список событий будет обработан прямо перед сохранением бизнес-объекта в хранилище данных. В этом случае это происходит в SaveChangesAsync методе IContainerContext экземпляра, который реализуется в частном RaiseDomainEvents методе. (dObjs представляет собой список отслеживаемых сущностей контекста контейнера.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

В последней строке пакет MediatR , реализация шаблона посредника в C#, используется для публикации события в приложении. Это возможно, так как все события, такие как ContactNameUpdatedEvent реализация INotification интерфейса пакета MediatR.

Эти события должны обрабатываться соответствующим обработчиком. Здесь реализация IEventsRepository вступает в игру. Ниже приведен пример обработчика NameUpdated событий:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Экземпляр IEventRepository внедряется в класс обработчика с помощью конструктора. ContactNameUpdatedEvent После публикации в службе Handle метод вызывается и использует экземпляр репозитория событий для создания объекта уведомления. Этот объект уведомления, в свою очередь, вставляется в список отслеживаемых объектов в IContainerContext объекте и присоединяет объекты, сохраненные в одном пакете транзакций, в Azure Cosmos DB.

До сих пор контекст контейнера знает, какие объекты следует обрабатывать. Чтобы в конечном итоге сохранить отслеживаемые объекты в Azure Cosmos DB, IContainerContext реализация создает пакет транзакций, добавляет все соответствующие объекты и выполняет операцию в базе данных. Описанный процесс обрабатывается в методе SaveInTransactionalBatchAsync , который вызывается методом SaveChangesAsync .

Ниже приведены важные части реализации, которые необходимо создать и запустить пакет транзакций:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Ниже приведен обзор работы процесса (для обновления имени объекта контакта):

  1. Клиент хочет обновить имя контакта. Метод SetName вызывается в объекте контакта, а свойства обновляются.
  2. Событие ContactNameUpdated добавляется в список событий в объекте домена.
  3. Вызывается метод репозитория Update контактов, который добавляет объект домена в контекст контейнера. Теперь объект отслеживается.
  4. CommitAsync вызывается в экземпляре UnitOfWork , который в свою очередь вызывает SaveChangesAsync контекст контейнера.
  5. В пределах SaveChangesAsyncвсех событий в списке объекта домена публикуются экземпляром MediatR и добавляются через репозиторий событий в один и тот же контекст контейнера.
  6. В SaveChangesAsyncней TransactionalBatch создается объект. Он будет содержать как объект контакта, так и событие.
  7. Запуски TransactionalBatch и данные фиксируется в Azure Cosmos DB.
  8. SaveChangesAsync и CommitAsync успешно возвращается.

Сохраняемость

Как видно из приведенных выше фрагментов кода, все объекты, сохраненные в Azure Cosmos DB, упаковываются в DataObject экземпляр. Этот объект предоставляет общие свойства:

Эти свойства определяются в универсальном интерфейсе, который вызывается IDataObject и используется репозиториями и контекстом контейнера:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Объекты, упакованные DataObject в экземпляр и сохраненные в базе данных, будут выглядеть следующим образом (Contact и ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "[email protected]",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Вы увидите, что Contact документы и ContactNameUpdatedEvent (тип domainEvent) имеют один и тот же ключ секции, и оба документа будут сохранены в одной логической секции.

Обработка ленты изменений

Чтобы прочитать поток событий и отправить их брокеру сообщений, служба будет использовать канал изменений Azure Cosmos DB.

Канал изменений — это постоянный журнал изменений в контейнере. Он работает в фоновом режиме и отслеживает изменения. В одной логической секции порядок изменений гарантируется. Самый удобный способ чтения канала изменений — использовать функцию Azure с триггером Azure Cosmos DB. Другой вариант — использовать библиотеку обработчика канала изменений. Она позволяет интегрировать обработку канала изменений в веб-API в качестве фоновой службы (через IHostedService интерфейс). В этом примере используется простое консольное приложение, реализующее абстрактный класс BackgroundService для размещения длительных фоновых задач в приложениях .NET Core.

Чтобы получить изменения из канала изменений Azure Cosmos DB, необходимо создать экземпляр ChangeFeedProcessor объекта, зарегистрировать метод обработчика для обработки сообщений и начать прослушивание изменений:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Затем метод обработчика (HandleChangesAsync здесь) обрабатывает сообщения. В этом примере события публикуются в разделе служебной шины, секционируемой для масштабируемости и включенной функции отмены дублирования. Любая служба, заинтересованная в изменениях Contact объектов, может подписаться на этот раздел служебной шины и получать и обрабатывать изменения для собственного контекста.

Сообщения служебной шины, созданные SessionId , имеют свойство. При использовании сеансов в служебной шине гарантируется, что порядок сообщений сохраняется (сначала во-первых, вне (FIFO)). Сохранение порядка необходимо для этого варианта использования.

Ниже приведен фрагмент кода, который обрабатывает сообщения из канала изменений:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Обработка ошибок

Если во время обработки изменений произошла ошибка, библиотека веб-канала изменений перезагрузит сообщения чтения в расположении, где он успешно обработал последний пакет. Например, если приложение успешно обработано 10 000 сообщений, теперь работает над пакетом 10 001 до 10 025, а ошибка возникает, она может перезапустить и забрать свою работу на позиции 10 001. Библиотека автоматически отслеживает то, что было обработано с помощью сведений, сохраненных в контейнере Leases в Azure Cosmos DB.

Возможно, служба уже отправит некоторые сообщения, которые повторно обрабатываются в служебную шину. Как правило, этот сценарий приведет к дублированию обработки сообщений. Как отмечалось ранее, служебная шина имеет функцию для обнаружения повторяющихся сообщений, которые необходимо включить для этого сценария. Служба проверяет, добавлено ли сообщение в раздел служебной шины (или очередь) на основе свойства, управляемого MessageId приложением сообщения. Это свойство имеет ID значение документа события. Если одно и то же сообщение отправляется в служебную шину, служба будет игнорировать и удалять ее.

Действия по обслуживанию

В типичной реализации исходящих транзакций служба обновляет обработанные события и задает Processed свойство true, указывающее, что сообщение успешно опубликовано. Это поведение можно реализовать вручную в методе обработчика. В текущем сценарии нет необходимости в таком процессе. Azure Cosmos DB отслеживает события, обработанные с помощью канала изменений (в сочетании с контейнером Leases ).

В качестве последнего шага иногда необходимо удалить события из контейнера, чтобы сохранить только самые последние записи и документы. Для периодической очистки реализация применяет другую функцию Azure Cosmos DB: Time To Live (TTL) для документов. Azure Cosmos DB может автоматически удалять документы на TTL основе свойства, которое можно добавить в документ: интервал времени в секундах. Служба постоянно проверяет контейнер для документов с свойством TTL . Как только срок действия документа истекает, Azure Cosmos DB удалит его из базы данных.

Когда все компоненты работают должным образом, события обрабатываются и публикуются быстро: в течение секунд. Если в Azure Cosmos DB возникла ошибка, события не будут отправляться в шину сообщений, так как бизнес-объект и соответствующие события не могут быть сохранены в базе данных. Единственное, что следует учитывать, заключается в том, чтобы задать соответствующее TTL значение для DomainEvent документов, когда фоновая рабочая роль (обработчик канала изменений) или служебная шина недоступна. В рабочей среде лучше выбрать интервал времени в несколько дней. Например, 10 дней. Все участвующие компоненты будут иметь достаточно времени для обработки и публикации изменений в приложении.

Сводка

Шаблон исходящих транзакций решает проблему надежной публикации событий домена в распределенных системах. Зафиксировав состояние бизнес-объекта и его события в том же пакете транзакций и используя фоновый обработчик в качестве ретранслятора сообщений, вы гарантируете, что другие службы, внутренние или внешние, в конечном итоге получат информацию, от которую они зависят. Этот пример не является традиционной реализацией шаблона исходящих транзакций. В нем используются такие функции, как канал изменений Azure Cosmos DB и время жизни, которые позволяют легко и чисто.

Ниже приведена сводка по компонентам Azure, используемым в этом сценарии:

Схема, на которую показаны компоненты Azure для реализации исходящих транзакций с помощью Azure Cosmos DB и служебной шины Azure.

Скачайте файл Visio для этой архитектуры.

Преимущества этого решения:

  • Надежная передача сообщений и гарантированная доставка событий.
  • Сохраненный порядок событий и отмены дублирования сообщений с помощью служебной шины.
  • Нет необходимости поддерживать дополнительное Processed свойство, указывающее на успешную обработку документа события.
  • Удаление событий из Azure Cosmos DB через время жизни (TTL). Процесс не использует единицы запросов, необходимые для обработки запросов пользователей или приложений. Вместо этого в фоновой задаче используются единицы запроса "оставшиеся".
  • Проверка ошибок обработки сообщений через ChangeFeedProcessor (или функцию Azure).
  • Необязательно. Несколько процессоров канала изменений, каждый из которых поддерживает собственный указатель в канале изменений.

Соображения

В примере приложения, описанного в этой статье, показано, как реализовать шаблон исходящих транзакций в Azure с помощью Azure Cosmos DB и служебной шины. Существуют и другие подходы, использующие базы данных NoSQL. Чтобы гарантировать, что бизнес-объект и события будут надежно сохранены в базе данных, можно внедрить список событий в документ бизнес-объекта. Недостатком этого подхода является то, что процесс очистки потребуется обновить каждый документ, содержащий события. Это не идеально, особенно с точки зрения затрат на единицу запроса, по сравнению с использованием TTL.

Имейте в виду, что не следует учитывать пример кода, предоставленного здесь, готовым к работе. Он имеет некоторые ограничения в отношении многопоточной обработки, особенно способ обработки событий в DomainEntity классе и способ отслеживания объектов в CosmosContainerContext реализации. Используйте его в качестве отправной точки для собственных реализаций. Кроме того, рекомендуется использовать существующие библиотеки, которые уже имеют эту функцию, встроенную в них, например NServiceBus или MassTransit.

Развертывание этого сценария

Исходный код, файлы развертывания и инструкции по тестированию этого сценария можно найти на сайте GitHub: https://github.com/Azure-Samples/transactional-outbox-pattern.

Соавторы

Эта статья поддерживается корпорацией Майкрософт. Первоначально он был написан следующими участниками.

Основной автор:

Чтобы просмотреть недоступные профили LinkedIn, войдите в LinkedIn.

Дальнейшие действия

Дополнительные сведения см. в следующих статьях: