Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
Обработчик канала изменений входит в SDK Azure Cosmos DB .NET версии 3 и Java версии 4. Он упрощает процесс чтения потока изменений и эффективно распределяет обработку событий между несколькими потребителями.
Основное преимущество использования обработчика канала изменений — это его отказоустойчивый дизайн, который обеспечивает доставку всех событий в канале изменений по крайней мере один раз.
Поддерживаемые SDK
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
Компоненты обработчика канала изменений
Обработчик канала изменений имеет четыре основных компонента:
Отслеживаемый контейнер — это контейнер с данными, из которых формируется канал изменений. Все операции вставки и обновлений в отслеживаемом контейнере отражаются в канале изменений контейнера.
Контейнер аренды: контейнер аренды выступает в качестве хранилища состояний и координирует обработку канала изменений между несколькими рабочими рабочая роль. Контейнер аренды может храниться в той же учетной записи, что и отслеживаемый контейнер, или в отдельной учетной записи.
Вычислительный экземпляр: В вычислительном экземпляре размещается обработчик потока изменений для отслеживания изменений. В зависимости от платформы она может представляться виртуальной машиной, pod Kubernetes, экземпляром службы приложение Azure или фактическим физическим компьютером. В вычислительном экземпляре есть уникальный идентификатор, который называется именем экземпляра в этой статье.
Делегат — это код, в котором разработчик описывает требуемые действия для каждого пакета изменений, считанного обработчиком канала изменений.
Дополнительные сведения о том, как эти четыре элемента обработчика канала изменений работают вместе, давайте рассмотрим пример на следующей схеме. Отслеживаемый контейнер хранит элементы и использует "City" в качестве ключа раздела. Значения ключа секции распределяются в диапазонах (каждый диапазон представляет физический раздел), содержащий элементы.
На схеме показаны два вычислительных экземпляра, а обработчик канала изменений назначает разные диапазоны каждому экземпляру для максимального распределения вычислений. Каждый экземпляр имеет другое уникальное имя.
Каждый диапазон считывается параллельно. Прогресс диапазона сохраняется отдельно от других диапазонов в арендном контейнере с помощью документа аренды. Сочетание аренды представляет текущее состояние обработчика канала изменений.
Реализация обработчика потока изменений
Обработчик канала изменений в .NET доступен для режима последней версии и режима всех версий и удалений. Все версии и режим удаления доступны в режиме предварительного просмотра и поддерживаются для обработчика канала изменений, начиная с версии 3.40.0-preview.0
. Точка входа для обоих режимов всегда является отслеживаемой контейнерой.
Чтобы читать в режиме последней версии в экземпляре Container
, вызовите GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Чтобы читать, используя все версии и режим удалений, вызовите GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
из экземпляра Container
:
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Для обоих режимов первый параметр — это отдельное имя, описывающее цель этого процессора. Вторая часть имени — это реализация делегата, которая обрабатывает изменения данных.
Вот пример делегата для режима последней версии:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Вот пример делегата для всех версий и режима удаления:
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
После этого вы определите имя вычислительного экземпляра или уникальный идентификатор с помощью WithInstanceName
. Имя вычислительного экземпляра должно быть уникальным и отличаться для каждого экземпляра, который вы разворачиваете. Контейнер задается для поддержания состояния аренды с помощью WithLeaseContainer
.
Вызов Build
предоставляет экземпляр процессора, который можно запустить с помощью вызова StartAsync
.
Примечание.
Приведенные выше фрагменты кода взяты из примеров в GitHub. Вы можете получить образец для режима последней версии или режима всех версий и удалений.
Жизненный цикл обработки
Обычный жизненный цикл хост-инстанса:
- Прочитайте канал изменений.
- Если изменений нет, приостановите выполнение на предопределенное время (настраиваемое с помощью
WithPollInterval
в Builder) и вернитесь к пункту №1. - При наличии изменений отправьте их ответственному лицу.
- Когда делегат успешно завершит обработку изменений, обновите хранилище данных аренды с последней обработанной точкой во времени и перейдите к пункту № 1.
Обработка ошибок
Обработчик канала изменений устойчив к ошибкам пользовательского кода. Если реализация делегата имеет необработанное исключение (шаг 4), поток, обрабатывающий конкретный пакет изменений, останавливается, а новый поток в конечном итоге создается. Новый поток проверяет последнее сохраненное время в хранилище орендных записей для данного диапазона значений ключа раздела. Новый поток перезапускается оттуда, эффективно отправляя тот же пакет изменений делегату. Это поведение продолжается до тех пор, пока делегат не обрабатывает изменения правильно, и это причина, по которой обработчик канала изменений имеет гарантию "по крайней мере один раз".
Примечание.
Только в одном сценарии пакет изменений не повторяется. Если сбой происходит при первом выполнении делегата, хранилище аренды не имеет предыдущего сохраненного состояния, которое может быть использовано при повторной попытке. В таких случаях повтор использует начальную конфигурацию, которая может включать или не включать последний пакет.
Чтобы предотвратить непрерывное повторение одного и того же пакета изменений обработчиком канала изменений, необходимо добавить логику в код делегата для записи документов, за исключением исключения, в очередь сообщений с ошибками. Такая схема гарантирует, что вы сможете отследить необработанные изменения, сохранив возможность продолжать обработку будущих изменений. Очередь сообщений об ошибках может быть другим контейнером Azure Cosmos DB. Точное хранилище данных не имеет значения. Вы просто хотите сохранить необработанные изменения.
Вы также можете использовать оценщик потока изменений для отслеживания хода выполнения экземпляров обработчика потока изменений при чтении потока изменений, или использовать уведомления о жизненном цикле для обнаружения скрытых сбоев.
Уведомления жизненного цикла
Обработчик канала изменений можно подключить к любому соответствующему событию в жизненном цикле. Вы можете получать уведомления одному или всем из них. Мы рекомендуем зарегистрировать по меньшей мере получение уведомлений об ошибках.
- Зарегистрируйте обработчик для
WithLeaseAcquireNotification
, чтобы получать уведомления о том, что текущий узел зарегистрировал аренду и начал ее обрабатывать. - Зарегистрируйте обработчик для
WithLeaseReleaseNotification
, чтобы получать уведомления, когда текущий сервер освобождает аренду и прекращает её обработку. - Зарегистрируйте обработчик для
WithErrorNotification
, чтобы получать уведомления, когда текущий хост обнаруживает исключение во время обработки. Необходимо иметь возможность различать, является ли источник делегатом пользователя (необработанным исключением) или ошибкой, с которой обработчик сталкивается при попытке получить доступ к отслеживаемой контейнере (например, сетевым проблемам).
Уведомления жизненного цикла доступны в обоих режимах канала изменений. Ниже приведен пример уведомлений жизненного цикла в режиме последней версии:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Единица развертывания
Одна единица развертывания обработчика канала изменений состоит из одного или нескольких вычислительных экземпляров, имеющих одинаковое значение processorName
и одну конфигурацию контейнера аренды, но разные имена экземпляров. Вы можете иметь множество единиц развертывания, в которых каждый блок имеет другой бизнес-поток для изменений, и каждая единица развертывания состоит из одного или нескольких экземпляров.
Например, у вас может быть одна единица развертывания, которая активирует внешний API при каждом изменении контейнера. Другая единица развертывания может перемещать данные в режиме реального времени при каждом изменении. Когда изменение происходит в отслеживаемом контейнере, все единицы развертывания уведомляются.
Динамическое масштабирование
Как упоминалось ранее, в единице развертывания можно использовать один или несколько вычислительных экземпляров. Чтобы воспользоваться преимуществами распределения вычислений в единице развертывания, единственными ключевыми требованиями являются следующие:
- Все экземпляры должны иметь одинаковую конфигурацию арендуемых контейнеров.
- У всех экземпляров должно быть одинаковое значение
processorName
. - Каждый экземпляр должен иметь уникальное имя (
WithInstanceName
).
Если применяются эти три условия, обработчик канала изменений распределяет все аренды, которые находятся в контейнере аренды во всех запущенных экземплярах этой единицы развертывания, и он параллелизирует вычисления с помощью алгоритма равного распределения. Аренда принадлежит одному экземпляру в любое время, поэтому количество экземпляров не должно превышать количество аренд.
Число экземпляров может увеличиваться и уменьшаться. Обработчик потока изменений динамически настраивает нагрузку, соответственно перераспределяя её.
Кроме того, обработчик канала изменений может динамически корректировать масштаб контейнера при увеличении его пропускной способности или объёма хранилища. Когда контейнер растет, процессор изменений прозрачно обрабатывает сценарий путем автоматического увеличения количества аренд и распределения новых аренды между существующими экземплярами.
Время запуска
По умолчанию, при первом запуске обработчика канала изменений, он инициализирует контейнер аренды и запускает его жизненный цикл обработки. Изменения, которые произошли в отслеживаемом контейнере перед первой инициализацией обработчика канала изменений, не будут обнаружены.
Чтение с предыдущей даты и времени
Обработчик изменений можно инициализировать для чтения изменений, начиная с определенной даты и времени, передав экземпляр DateTime
в расширение построителя WithStartTime
:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
Обработчик потока изменений инициализируется для указанной даты и времени и начинает считывать изменения, которые произошли после этого.
Чтение с начала
В других сценариях, например при миграции данных или при анализе всей истории контейнера, необходимо считывать канал изменений с начала существования этого контейнера. Вы можете использовать WithStartTime
расширение сборщика, но передать DateTime.MinValue.ToUniversalTime()
, которое создаёт представление минимального значения DateTime
в формате UTC; как в следующем примере:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Обработчик канала изменений инициализирован и начинает считывать изменения с начала времени существования контейнера.
Примечание.
Эти параметры настройки используются только для установки начальной точки во времени процессора потока изменений. После инициализации контейнера аренды в первый раз изменение этих параметров не влияет.
Настройка начальной точки доступна только для режима канала изменений последней версии. При использовании режимов всех версий и удаления данных необходимо начать чтение с момента запуска процессора или возобновить работу с предыдущего состояния аренды, которое находится в пределах периода непрерывного хранения резервных копий вашей учетной записи.
Канал изменений и выделенная пропускная способность
Операции чтения канала изменений в отслеживаемом контейнере используют единицы запросов. Убедитесь, что отслеживаемый контейнер не подвергается ограничению. Регулирование добавляет задержки в получении событий канала изменений на процессорах.
Операции с контейнером аренды (обновление и поддержание состояния) потребляют единицы запроса. Чем выше число экземпляров, использующих один и тот же контейнер аренды, тем выше потенциальное потребление единиц запросов. Убедитесь, что контейнер аренды не испытывает ограничения. Регулирование добавляет задержки при получении событий канала изменений. Регулирование может даже полностью завершить обработку.
Поделиться контейнером для аренды
Контейнер аренды можно совместно использовать в нескольких единицах развертывания. В контейнере общей аренды каждая единица развертывания прослушивает другой отслеживаемый контейнер или имеет другое значение.processorName
В этой конфигурации каждая единица развертывания поддерживает независимое состояние контейнера аренды.
Проверьте потребление единиц запроса в арендованном контейнере, чтобы убедиться, что подготовленная пропускная способность достаточна для всех единиц развертывания.
Расширенная конфигурация аренды
Три ключевых конфигурации могут повлиять на работу обработчика канала изменений. Каждая конфигурация влияет на потребление единиц запроса в контейнере аренды. Вы можете задать одну из этих конфигураций при создании обработчика канала изменений, но используйте их тщательно.
- Получение аренды: по умолчанию это происходит каждые 17 секунд. Хост периодически проверяет состояние магазина аренд и рассматривает возможность получения прав в рамках процесса динамического масштабирования. Этот процесс выполняется путем выполнения запроса к контейнеру аренды. Сокращение этого значения делает перебалансировку и получение аренды быстрее, но увеличивает потребление единиц запросов в контейнере аренды.
- Срок действия аренды: по умолчанию 60 секунд. Определяет максимальный срок, в течение которого аренда может существовать без продления, прежде чем она будет передана другому узлу. Когда узел выходит из строя, аренды, которые ему принадлежали, принимаются другими узлами после истечения этого периода времени плюс настроенный интервал продления. Уменьшение этого значения делает восстановление после сбоя узла быстрее, но срок действия никогда не должен быть ниже интервала продления.
- Продление аренды: по умолчанию каждые 13 секунд. Хост, который владеет арендной записью, периодически продлевает её, даже если нет новых изменений. Этот процесс выполняется путем выполнения операции по обновлению аренды. Уменьшение этого значения сокращает время, необходимое для обнаружения аренды, потерянной из-за сбоя узла, но увеличивает потребление ресурсов запросов в арендном контейнере.
Место размещения обработчика канала изменений
Обработчик канала изменений можно разместить на любой платформе, поддерживающей длительные процессы или задачи. Далее приводятся некоторые примеры.
- Непрерывный запуск экземпляра WebJobs в службе приложений Azure
- Процесс в экземпляре Azure Virtual Machines
- Фоновое задание в Azure Kubernetes Service
- Бессерверная функция в Функции Azure
- Хостинговая служба ASP.NET
Хотя обработчик канала изменений может работать в коротких средах, так как контейнер аренды сохраняет состояние, цикл запуска этих сред добавляет задержки в время получения уведомлений (из-за затрат на запуск процессора при каждом запуске среды).
Требования к доступу на основе ролей
При использовании идентификатора Microsoft Entra в качестве механизма проверки подлинности убедитесь, что удостоверение имеет соответствующие разрешения:
- В отслеживаемом контейнере:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- В контейнере аренды:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
Дополнительные ресурсы
- Azure Cosmos DB SDK
- Полный пример приложения на GitHub
- Дополнительные примеры использования на GitHub
- Семинарские лаборатории Azure Cosmos DB для обработчика ленты изменений
Следующие шаги
Дополнительные сведения о обработчике ленты изменений читайте в следующих статьях: