Modello outbox transazionale con Azure Cosmos DB
L'implementazione di messaggistica affidabile nei sistemi distribuiti può risultare complessa. Questo articolo descrive come usare il modello Outbox transazionale per la messaggistica affidabile e il recapito garantito degli eventi, una parte importante del supporto dell'elaborazione dei messaggi idempotenti. A tale scopo, si useranno batch transazionali e feed di modifiche di Azure Cosmos DB in combinazione con il bus di servizio di Azure.
Informazioni generali
Le architetture di microservizi stanno ottenendo popolarità e mostrano la promessa di risolvere problemi come scalabilità, manutenibilità e agilità, soprattutto nelle applicazioni di grandi dimensioni. Ma questo modello di architettura presenta anche sfide per quanto riguarda la gestione dei dati. Nelle applicazioni distribuite ogni servizio gestisce in modo indipendente i dati necessari per operare in un archivio dati di proprietà del servizio dedicato. Per supportare questo scenario, in genere si usa una soluzione di messaggistica come RabbitMQ, Kafka o il bus di servizio di Azure che distribuisce i dati (eventi) da un servizio tramite un bus di messaggistica ad altri servizi dell'applicazione. I consumer interni o esterni possono quindi sottoscrivere tali messaggi e ricevere una notifica delle modifiche non appena vengono modificati i dati.
Un esempio noto in tale area è un sistema di ordinamento: quando un utente vuole creare un ordine, un Ordering
servizio riceve i dati da un'applicazione client tramite un endpoint REST. Esegue il mapping del payload a una rappresentazione interna di un Order
oggetto per convalidare i dati. Dopo aver eseguito il commit nel database, pubblica un OrderCreated
evento in un bus di messaggi. Qualsiasi altro servizio interessato a nuovi ordini ,ad esempio un Inventory
servizio o Invoicing
, sottoscrive i OrderCreated
messaggi, li elabora e li archivia nel proprio database.
Lo pseudocodice seguente illustra l'aspetto di questo processo dal punto di vista del Ordering
servizio:
CreateNewOrder(CreateOrderDto order){
// Validate the incoming data.
...
// Apply business logic.
...
// Save the object to the database.
var result = _orderRespository.Create(order);
// Publish the respective event.
_messagingService.Publish(new OrderCreatedEvent(result));
return Ok();
}
Questo approccio funziona correttamente fino a quando non si verifica un errore tra il salvataggio dell'oggetto order e la pubblicazione dell'evento corrispondente. L'invio di un evento potrebbe non riuscire a questo punto per molti motivi:
- errori di rete
- Interruzione del servizio messaggi
- Errore dell'host
Indipendentemente dall'errore, il risultato è che l'evento OrderCreated
non può essere pubblicato nel bus di messaggi. Altri servizi non riceveranno una notifica che indica che è stato creato un ordine. Il Ordering
servizio deve ora occuparsi di vari aspetti che non sono correlati al processo aziendale effettivo. Deve tenere traccia degli eventi che devono ancora essere messi sul bus di messaggi non appena torna online. Anche il caso peggiore può verificarsi: incoerenze dei dati nell'applicazione a causa di eventi persi.
Soluzione
Esiste un modello noto denominato Outbox transazionale che consente di evitare queste situazioni. Garantisce che gli eventi vengano salvati in un archivio dati (in genere in una tabella Outbox nel database) prima che vengano inviati a un broker di messaggi. Se l'oggetto business e gli eventi corrispondenti vengono salvati all'interno della stessa transazione di database, è garantito che nessun dato andrà perso. Verrà eseguito il commit di tutto o verrà eseguito il rollback di tutti gli elementi in caso di errore. Per pubblicare l'evento, un altro servizio o processo di lavoro esegue una query sulla tabella Posta in uscita per le voci non gestite, pubblica gli eventi e li contrassegna come elaborati. Questo modello garantisce che gli eventi non andranno persi dopo la creazione o la modifica di un oggetto business.
Scaricare un file di Visio di questa architettura.
In un database relazionale l'implementazione del modello è semplice. Se il servizio usa Entity Framework Core, ad esempio, userà un contesto di Entity Framework per creare una transazione di database, salvare l'oggetto business e l'evento ed eseguire il commit della transazione oppure eseguire un rollback. Inoltre, il servizio di lavoro che elabora gli eventi è facile da implementare: esegue periodicamente una query sulla tabella Outbox per le nuove voci, pubblica gli eventi appena inseriti nel bus di messaggi e infine contrassegna queste voci come elaborate.
In pratica, le cose non sono così facili come potrebbero guardare per primi. Soprattutto, è necessario assicurarsi che l'ordine degli eventi venga mantenuto in modo che un OrderUpdated
evento non venga pubblicato prima di un OrderCreated
evento.
Implementazione in Azure Cosmos DB
Questa sezione illustra come implementare il modello Outbox transazionale in Azure Cosmos DB per ottenere una messaggistica affidabile e ordinata tra diversi servizi con l'aiuto del feed di modifiche di Azure Cosmos DB e del bus di servizio. Illustra un servizio di esempio che gestisce Contact
gli oggetti (FirstName
, LastName
, Email
, Company
informazioni e così via). Usa il modello Command and Query Responsibility Segregation (CQRS) e segue i concetti di base relativi alla progettazione basata su dominio. È possibile trovare il codice di esempio per l'implementazione in GitHub.
Un Contact
oggetto nel servizio di esempio ha la struttura seguente:
{
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "[email protected]",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false
}
Non appena un oggetto Contact
viene creato o aggiornato, genera eventi che contengono informazioni sulla modifica corrente. Tra gli altri, gli eventi di dominio possono essere:
-
ContactCreated
. Generato quando viene aggiunto un contatto. -
ContactNameUpdated
. Generato quandoFirstName
oLastName
viene modificato. -
ContactEmailUpdated
. Generato quando l'indirizzo di posta elettronica viene aggiornato. -
ContactCompanyUpdated
. Generato quando una delle proprietà dell'azienda viene modificata.
Batch transazionali
Per implementare questo modello, è necessario assicurarsi che l'oggetto Contact
business e gli eventi corrispondenti vengano salvati nella stessa transazione di database. In Azure Cosmos DB le transazioni funzionano in modo diverso rispetto a quelle eseguite nei sistemi di database relazionali. Le transazioni di Azure Cosmos DB, denominate batch transazionali, operano su una singola partizione logica, quindi garantiscono le proprietà Atomicity, Consistency, Isolation e Durability (ACID). Non è possibile salvare due documenti in un'operazione batch transazionale in contenitori o partizioni logiche diverse. Per il servizio di esempio, significa che sia l'oggetto business che l'evento o gli eventi verranno inseriti nello stesso contenitore e nella stessa partizione logica.
Contesto, repository e UnitOfWork
Il nucleo dell'implementazione di esempio è un contesto contenitore che tiene traccia degli oggetti salvati nello stesso batch transazionale. Gestisce un elenco di oggetti creati e modificati e opera su un singolo contenitore di Azure Cosmos DB. L'interfaccia è simile alla seguente:
public interface IContainerContext
{
public Container Container { get; }
public List<IDataObject<Entity>> DataObjects { get; }
public void Add(IDataObject<Entity> entity);
public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
public void Reset();
}
L'elenco nel componente del contesto del contenitore tiene traccia Contact
e DomainEvent
oggetti. Entrambi verranno inseriti nello stesso contenitore. Ciò significa che più tipi di oggetti vengono archiviati nello stesso contenitore di Azure Cosmos DB e usano una Type
proprietà per distinguere tra un oggetto business e un evento.
Per ogni tipo, è disponibile un repository dedicato che definisce e implementa l'accesso ai dati. L'interfaccia del Contact
repository fornisce questi metodi:
public interface IContactsRepository
{
public void Create(Contact contact);
public Task<(Contact, string)> ReadAsync(Guid id, string etag);
public Task DeleteAsync(Guid id, string etag);
public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
public void Update(Contact contact, string etag);
}
Il Event
repository ha un aspetto simile, ad eccezione di un solo metodo, che crea nuovi eventi nell'archivio:
public interface IEventRepository
{
public void Create(ContactDomainEvent e);
}
Le implementazioni di entrambe le interfacce del repository ottengono un riferimento tramite l'inserimento delle dipendenze in una singola IContainerContext
istanza per garantire che entrambe funzionino nello stesso contesto di Azure Cosmos DB.
L'ultimo componente è UnitOfWork
, che esegue il commit delle modifiche contenute nell'istanza IContainerContext
in Azure Cosmos DB:
public class UnitOfWork : IUnitOfWork
{
private readonly IContainerContext _context;
public IContactRepository ContactsRepo { get; }
public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
{
_context = ctx;
ContactsRepo = cRepo;
}
public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
{
return _context.SaveChangesAsync(cancellationToken);
}
}
Gestione degli eventi: creazione e pubblicazione
Ogni volta che un Contact
oggetto viene creato, modificato o eliminato (temporaneamente), il servizio genera un evento corrispondente. Il nucleo della soluzione fornita è una combinazione di progettazione basata su dominio (DDD) e il modello mediator proposto da Jimmy Bogard. Suggerisce di mantenere un elenco di eventi che si sono verificati a causa di modifiche dell'oggetto dominio e di pubblicazione di questi eventi prima di salvare l'oggetto effettivo nel database.
L'elenco delle modifiche viene mantenuto nell'oggetto dominio stesso in modo che nessun altro componente possa modificare la catena di eventi. Il comportamento di gestione degli eventi (IEvent
istanze) nell'oggetto di dominio viene definito tramite un'interfaccia IEventEmitter<IEvent>
e implementato in una classe astratta DomainEntity
:
public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
private readonly List<IEvent> _events = new();
[JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();
public virtual void AddEvent(IEvent domainEvent)
{
var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
if (i < 0)
{
_events.Add(domainEvent);
}
else
{
_events.RemoveAt(i);
_events.Insert(i, domainEvent);
}
}
[...]
[...]
}
L'oggetto Contact
genera eventi di dominio. L'entità Contact
segue i concetti DDD di base, configurando i setter delle proprietà di dominio come privati. Nella classe non esistono setter pubblici. Offre invece metodi per modificare lo stato interno. In questi metodi è possibile generare eventi appropriati per una determinata modifica (ad esempio ContactNameUpdated
o ContactEmailUpdated
).
Ecco un esempio per gli aggiornamenti al nome di un contatto. L'evento viene generato alla fine del metodo.
public void SetName(string firstName, string lastName)
{
if (string.IsNullOrWhiteSpace(firstName) ||
string.IsNullOrWhiteSpace(lastName))
{
throw new ArgumentException("FirstName or LastName cannot be empty");
}
Name = new Name(firstName, lastName);
if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent
AddEvent(new ContactNameUpdatedEvent(Id, Name));
ModifiedAt = DateTimeOffset.UtcNow;
}
L'oggetto corrispondente ContactNameUpdatedEvent
, che tiene traccia delle modifiche, è simile al seguente:
public class ContactNameUpdatedEvent : ContactDomainEvent
{
public Name Name { get; }
public ContactNameUpdatedEvent(Guid contactId, Name contactName) :
base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
{
Name = contactName;
}
}
Finora, gli eventi vengono semplicemente registrati nell'oggetto di dominio e non viene salvato nulla nel database o anche pubblicato in un broker di messaggi. Seguendo la raccomandazione, l'elenco degli eventi verrà elaborato subito prima che l'oggetto business venga salvato nell'archivio dati. In questo caso, si verifica nel SaveChangesAsync
metodo dell'istanza IContainerContext
, che viene implementato in un metodo privato RaiseDomainEvents
. (dObjs
è l'elenco delle entità rilevate del contesto del contenitore.
private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
var eventEmitters = new List<IEventEmitter<IEvent>>();
// Get all EventEmitters.
foreach (var o in dObjs)
if (o.Data is IEventEmitter<IEvent> ee)
eventEmitters.Add(ee);
// Raise events.
if (eventEmitters.Count <= 0) return;
foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
_mediator.Publish(evt);
}
Nell'ultima riga, il pacchetto MediatR , un'implementazione del modello mediator in C#, viene usata per pubblicare un evento all'interno dell'applicazione. Questa operazione è possibile perché tutti gli eventi come ContactNameUpdatedEvent
implementare l'interfaccia INotification
del pacchetto MediatR.
Questi eventi devono essere elaborati da un gestore corrispondente. In questo caso, l'implementazione IEventsRepository
entra in gioco. Ecco l'esempio del NameUpdated
gestore eventi:
public class ContactNameUpdatedHandler :
INotificationHandler<ContactNameUpdatedEvent>
{
private IEventRepository EventRepository { get; }
public ContactNameUpdatedHandler(IEventRepository eventRepo)
{
EventRepository = eventRepo;
}
public Task Handle(ContactNameUpdatedEvent notification,
CancellationToken cancellationToken)
{
EventRepository.Create(notification);
return Task.CompletedTask;
}
}
Un'istanza IEventRepository
viene inserita nella classe del gestore tramite il costruttore . Non appena un oggetto ContactNameUpdatedEvent
viene pubblicato nel servizio, il Handle
metodo viene richiamato e usa l'istanza del repository eventi per creare un oggetto notifica. Tale oggetto di notifica viene a sua volta inserito nell'elenco di oggetti rilevati nell'oggetto IContainerContext
e unisce gli oggetti salvati nello stesso batch transazionale in Azure Cosmos DB.
Finora, il contesto del contenitore conosce gli oggetti da elaborare. Per rendere persistenti gli oggetti rilevati in Azure Cosmos DB, l'implementazione IContainerContext
crea il batch transazionale, aggiunge tutti gli oggetti pertinenti ed esegue l'operazione sul database. Il processo descritto viene gestito nel SaveInTransactionalBatchAsync
metodo , richiamato dal SaveChangesAsync
metodo .
Ecco le parti importanti dell'implementazione che è necessario creare ed eseguire il batch transazionale:
private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
CancellationToken cancellationToken)
{
if (DataObjects.Count > 0)
{
var pk = new PartitionKey(DataObjects[0].PartitionKey);
var tb = Container.CreateTransactionalBatch(pk);
DataObjects.ForEach(o =>
{
TransactionalBatchItemRequestOptions tro = null;
if (!string.IsNullOrWhiteSpace(o.Etag))
tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };
switch (o.State)
{
case EntityState.Created:
tb.CreateItem(o);
break;
case EntityState.Updated or EntityState.Deleted:
tb.ReplaceItem(o.Id, o, tro);
break;
}
});
var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
}
// Return copy of current list as result.
var result = new List<IDataObject<Entity>>(DataObjects);
// Work has been successfully done. Reset DataObjects list.
DataObjects.Clear();
return result;
}
Ecco una panoramica del funzionamento del processo finora (per aggiornare il nome in un oggetto contatto):
- Un client vuole aggiornare il nome di un contatto. Il
SetName
metodo viene richiamato sull'oggetto contatto e le proprietà vengono aggiornate. - L'evento
ContactNameUpdated
viene aggiunto all'elenco di eventi nell'oggetto dominio. - Viene richiamato il metodo del
Update
repository dei contatti, che aggiunge l'oggetto dominio al contesto del contenitore. L'oggetto viene ora rilevato. -
CommitAsync
viene richiamato sull'istanzaUnitOfWork
di , che a sua volta chiamaSaveChangesAsync
sul contesto del contenitore. - All'interno
SaveChangesAsync
di , tutti gli eventi nell'elenco dell'oggetto dominio vengono pubblicati da un'istanzaMediatR
di e vengono aggiunti tramite il repository eventi allo stesso contesto contenitore. - In
SaveChangesAsync
viene creato un oggettoTransactionalBatch
. Conterrà sia l'oggetto contatto che l'evento. - Le
TransactionalBatch
esecuzioni e i dati vengono sottoposti a commit in Azure Cosmos DB. -
SaveChangesAsync
eCommitAsync
restituito correttamente.
Persistenza
Come si può notare nei frammenti di codice precedenti, tutti gli oggetti salvati in Azure Cosmos DB vengono inclusi in un'istanza DataObject
di . Questo oggetto fornisce proprietà comuni:
-
ID
. -
PartitionKey
. -
Type
. -
State
. ComeCreated
,Updated
non verrà salvato in modo permanente in Azure Cosmos DB. -
Etag
. Per il blocco ottimistico. -
TTL
. Proprietà Time To Live per la pulizia automatica dei documenti precedenti. -
Data
. Oggetto dati generico.
Queste proprietà sono definite in un'interfaccia generica chiamata IDataObject
e usata dai repository e dal contesto del contenitore:
public interface IDataObject<out T> where T : Entity
{
string Id { get; }
string PartitionKey { get; }
string Type { get; }
T Data { get; }
string Etag { get; set; }
int Ttl { get; }
EntityState State { get; set; }
}
Gli oggetti di cui è stato eseguito il wrapping in un'istanza DataObject
e salvati nel database avranno un aspetto simile a questo esempio (Contact
e ContactNameUpdatedEvent
):
// Contact document/object. After creation.
{
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "contact",
"data": {
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "[email protected]",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false,
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
},
"ttl": -1,
"_etag": "\"180014cc-0000-1500-0000-614455330000\"",
"_ts": 1632301657
}
// After setting a new name, this is how an event document looks.
{
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "domainEvent",
"data": {
"name": {
"firstName": "Jane",
"lastName": "Doe"
},
"contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"action": "ContactNameUpdatedEvent",
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"createdAt": "2021-09-22T11:37:37.3022907+02:00"
},
"ttl": 120,
"_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
"_ts": 1632303457
}
È possibile notare che i Contact
documenti e ContactNameUpdatedEvent
(tipo domainEvent
) hanno la stessa chiave di partizione e che entrambi i documenti verranno mantenuti nella stessa partizione logica.
Elaborazione del feed di modifiche
Per leggere il flusso di eventi e inviarli a un broker di messaggi, il servizio userà il feed di modifiche di Azure Cosmos DB.
Il feed di modifiche è un log permanente delle modifiche nel contenitore. Funziona in background e tiene traccia delle modifiche. All'interno di una partizione logica, l'ordine delle modifiche è garantito. Il modo più pratico per leggere il feed di modifiche consiste nell'usare una funzione di Azure con un trigger di Azure Cosmos DB. Un'altra opzione consiste nell'usare la libreria del processore dei feed di modifiche. Consente di integrare l'elaborazione dei feed di modifiche nell'API Web come servizio in background (tramite l'interfaccia IHostedService
). L'esempio seguente usa una semplice applicazione console che implementa la classe astratta BackgroundService per ospitare attività in background a esecuzione prolungata nelle applicazioni .NET Core.
Per ricevere le modifiche dal feed di modifiche di Azure Cosmos DB, è necessario creare un'istanza di un ChangeFeedProcessor
oggetto, registrare un metodo del gestore per l'elaborazione dei messaggi e avviare l'ascolto delle modifiche:
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
var changeFeedProcessor = _container
.GetChangeFeedProcessorBuilder<ExpandoObject>(
_configuration.GetSection("Cosmos")["ProcessorName"],
HandleChangesAsync)
.WithInstanceName(Environment.MachineName)
.WithLeaseContainer(_leaseContainer)
.WithMaxItems(25)
.WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
.WithPollInterval(TimeSpan.FromSeconds(3))
.Build();
_logger.LogInformation("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
_logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
return changeFeedProcessor;
}
Un metodo del gestore (HandleChangesAsync
qui) elabora quindi i messaggi. In questo esempio gli eventi vengono pubblicati in un argomento del bus di servizio partizionato per la scalabilità e la funzionalità di deduplicazione è abilitata. Qualsiasi servizio interessato alle modifiche agli Contact
oggetti può quindi sottoscrivere l'argomento del bus di servizio e ricevere ed elaborare le modifiche per il proprio contesto.
I messaggi del bus di servizio generati hanno una SessionId
proprietà . Quando si usano sessioni nel bus di servizio, si garantisce che l'ordine dei messaggi venga mantenuto (prima in, first out (FIFO)). Per questo caso d'uso è necessario conservare l'ordine.
Ecco il frammento di codice che gestisce i messaggi dal feed di modifiche:
private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received {changes.Count} document(s).");
var eventsCount = 0;
Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();
foreach (var document in changes as dynamic)
{
if (!((IDictionary<string, object>)document).ContainsKey("type") ||
!((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.
if (document.type == EVENT_TYPE) // domainEvent.
{
string json = JsonConvert.SerializeObject(document.data);
var sbMessage = new ServiceBusMessage(json)
{
ContentType = "application/json",
Subject = document.data.action,
MessageId = document.id,
PartitionKey = document.partitionKey,
SessionId = document.partitionKey
};
// Create message batch per partitionKey.
if (partitionedMessages.ContainsKey(document.partitionKey))
{
partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
}
else
{
partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
}
eventsCount++;
}
}
if (partitionedMessages.Count > 0)
{
_logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");
// Loop over each partition.
foreach (var partition in partitionedMessages)
{
// Create batch for partition.
using var messageBatch =
await _topicSender.CreateMessageBatchAsync(cancellationToken);
foreach (var msg in partition.Value)
if (!messageBatch.TryAddMessage(msg))
throw new Exception();
_logger.LogInformation(
$"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");
try
{
await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e.Message);
throw;
}
}
}
else
{
_logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
}
}
Gestione degli errori
Se si verifica un errore durante l'elaborazione delle modifiche, la libreria del feed di modifiche riavvia la lettura dei messaggi nella posizione in cui è stato elaborato correttamente l'ultimo batch. Ad esempio, se l'applicazione ha elaborato correttamente 10.000 messaggi, ora funziona in batch da 10.001 a 10.025 e si verifica un errore, può riavviare e raccogliere il lavoro nella posizione 10.001. La libreria tiene traccia automaticamente di ciò che è stato elaborato tramite informazioni salvate in un Leases
contenitore in Azure Cosmos DB.
È possibile che il servizio abbia già inviato alcuni dei messaggi che vengono rielaborati al bus di servizio. In genere, questo scenario comporta l'elaborazione di messaggi duplicati. Come indicato in precedenza, il bus di servizio include una funzionalità per il rilevamento di messaggi duplicati che è necessario abilitare per questo scenario. Il servizio controlla se un messaggio è già stato aggiunto a un argomento o a una coda del bus di servizio in base alla proprietà controllata dall'applicazione MessageId
del messaggio. Tale proprietà è impostata sull'oggetto del documento dell'evento ID
. Se lo stesso messaggio viene nuovamente inviato al bus di servizio, il servizio lo ignorerà e lo rilascia.
Gestione
In un'implementazione tipica della posta in uscita transazionale, il servizio aggiorna gli eventi gestiti e imposta una Processed
proprietà su true
, a indicare che un messaggio è stato pubblicato correttamente. Questo comportamento può essere implementato manualmente nel metodo del gestore. Nello scenario corrente non è necessario un processo di questo tipo. Azure Cosmos DB tiene traccia degli eventi elaborati usando il feed di modifiche (in combinazione con il Leases
contenitore).
Come ultimo passaggio, occasionalmente è necessario eliminare gli eventi dal contenitore in modo da mantenere solo i record/documenti più recenti. Per eseguire periodicamente una pulizia, l'implementazione applica un'altra funzionalità di Azure Cosmos DB: Durata (TTL
) nei documenti. Azure Cosmos DB può eliminare automaticamente i documenti in base a una TTL
proprietà che può essere aggiunta a un documento: un intervallo di tempo in secondi. Il servizio verificherà costantemente la presenza di documenti che dispongono di una TTL
proprietà. Non appena scade un documento, Azure Cosmos DB lo rimuoverà dal database.
Quando tutti i componenti funzionano come previsto, gli eventi vengono elaborati e pubblicati rapidamente: entro pochi secondi. Se si verifica un errore in Azure Cosmos DB, gli eventi non verranno inviati al bus di messaggi, perché sia l'oggetto business che gli eventi corrispondenti non possono essere salvati nel database. L'unica cosa da considerare è impostare un valore appropriato TTL
nei DomainEvent
documenti quando il ruolo di lavoro in background (processore feed di modifiche) o il bus di servizio non sono disponibili. In un ambiente di produzione è consigliabile scegliere un intervallo di tempo di più giorni. Ad esempio, 10 giorni. Tutti i componenti coinvolti avranno quindi tempo sufficiente per elaborare/pubblicare le modifiche all'interno dell'applicazione.
Riassunto
Il modello Outbox transazionale risolve il problema della pubblicazione affidabile di eventi di dominio nei sistemi distribuiti. Eseguendo il commit dello stato dell'oggetto business e dei relativi eventi nello stesso batch transazionale e usando un processore in background come inoltro di messaggi, si garantisce che altri servizi, interni o esterni, ricevano le informazioni da cui dipendono. Questo esempio non è un'implementazione tradizionale del modello Outbox transazionale. Usa funzionalità come il feed di modifiche di Azure Cosmos DB e Time To Live che mantengono le cose semplici e pulite.It uses features like the Azure Cosmos DB change feed and Time To Live that keep things simple and clean.
Ecco un riepilogo dei componenti di Azure usati in questo scenario:
Scaricare un file di Visio di questa architettura.
I vantaggi di questa soluzione sono:
- Messaggistica affidabile e recapito garantito di eventi.
- Ordine mantenuto di eventi e deduplicazione dei messaggi tramite il bus di servizio.
- Non è necessario mantenere una proprietà aggiuntiva
Processed
che indica la corretta elaborazione di un documento evento. - Eliminazione di eventi da Azure Cosmos DB tramite durata (TTL). Il processo non utilizza unità richiesta necessarie per la gestione delle richieste utente/applicazione. Usa invece le unità richiesta "leftover" in un'attività in background.
- Elaborazione di messaggi di correzione degli errori tramite
ChangeFeedProcessor
(o una funzione di Azure). - Facoltativo: più processori del feed di modifiche, ognuno dei quali mantiene il proprio puntatore nel feed di modifiche.
Considerazioni
L'applicazione di esempio illustrata in questo articolo illustra come implementare il modello Outbox transazionale in Azure con Azure Cosmos DB e il bus di servizio. Esistono anche altri approcci che usano database NoSQL. Per garantire che l'oggetto business e gli eventi verranno salvati in modo affidabile nel database, è possibile incorporare l'elenco di eventi nel documento dell'oggetto business. Lo svantaggio di questo approccio è che il processo di pulizia dovrà aggiornare ogni documento che contiene eventi. Non è ideale, soprattutto in termini di costo unità richiesta, rispetto all'uso della durata (TTL).
Tenere presente che non è consigliabile considerare il codice di esempio fornito qui codice pronto per la produzione. Presenta alcune limitazioni relative al multithreading, in particolare il modo in cui gli eventi vengono gestiti nella DomainEntity
classe e il modo in cui gli CosmosContainerContext
oggetti vengono rilevati nelle implementazioni. Usarlo come punto di partenza per le proprie implementazioni. In alternativa, prendere in considerazione l'uso di librerie esistenti che dispongono già di questa funzionalità incorporata, ad esempio NServiceBus o MassTransit.
Distribuire questo scenario
È possibile trovare il codice sorgente, i file di distribuzione e le istruzioni per testare questo scenario in GitHub: https://github.com/mspnp/transactional-outbox-pattern.
Contributori
Questo articolo viene gestito da Microsoft. Originariamente è stato scritto dai seguenti contributori.
Autore principale:
- Christian Dennig | Principal Software Engineer
Per visualizzare i profili LinkedIn non pubblici, accedere a LinkedIn.
Passaggi successivi
Per altre informazioni, vedere questi articoli:
- Progettazione basata su dominio
- Bus di servizio di Azure: deduplicazione messaggi
- Libreria del processore dei feed di modifiche
- Jimmy Bogard: modello di eventi di dominio migliore