Поделиться через


Используйте средство оценки изменений в канале.

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этой статье описывается, как можно отслеживать прогресс работы ваших экземпляров процессора канала изменений при считывании канала изменений.

Почему процесс отслеживания хода выполнения важен?

Обработчик канала изменений действует как указатель, который перемещается по каналу изменений и передает изменения делегату для реализации.

Ваш обработчик потока изменений может обрабатывать изменения с определенной скоростью в зависимости от доступных ресурсов, таких как ЦП, память, сеть и т. д.

Если эта скорость медленнее, чем скорость, с которой происходят изменения в контейнере Azure Cosmos DB, процессор начинает отставать.

Определение этого сценария помогает понять, нужно ли масштабировать развертывание обработчика потока изменений.

Внедрите оценщик потока изменений

В качестве модели отправки автоматических уведомлений

Как и обработчик канала изменений, средство оценки канала изменений может использоваться как модель отправки уведомлений. Оценщик измеряет разницу между последним обработанным элементом (определённым состоянием контейнера аренд) и последним произошедшим изменением в контейнере и передаёт это значение делегату. Интервал, с которым производится измерение, также можно настроить по умолчанию на 5 секунд.

Например, если обработчик ленты изменений использует режим последней версии и определяется следующим образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Правильным способом инициализации средства оценки для измерения этого обработчика будет использование GetChangeFeedEstimatorBuilder следующим образом:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Здесь и обработчик, и средство оценки имеют одно значение leaseContainer и одно и то же имя.

Остальные два параметра — делегат, который получает число, представляющее количество ожидающих изменений обработчиком, и интервал времени, с которым требуется выполнить это измерение.

Пример делегата, который получает оценку:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Вы можете отправить эту оценку в решение для мониторинга и использовать ее, чтобы понять, как ход выполнения меняется с течением времени.

В качестве подробной оценки по запросу

В отличие от модели push, существует альтернативный вариант, позволяющий получить оценку по запросу. Эта модель также предоставляет более подробные сведения:

  • Предполагаемая задержка по каждой аренде.
  • Экземпляр владеет арендой и обрабатывает ее, что позволяет выяснить, есть ли в экземпляре проблемы.

Если обработчик канала изменений определён таким образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Вы можете создать оценочную модель с той же настройкой аренды:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

И при необходимости можно получить подробную оценку с требуемой частотой:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Каждый ChangeFeedProcessorState содержит сведения об аренде и задержке, а также того, кто является текущим владельцем экземпляра.

Развертывание средства оценки

Измеритель канала изменений не обязательно должен быть развернут в составе вашего обработчика канала изменений и не обязан быть частью того же проекта. Рекомендуется развернуть модуль оценки на независимом экземпляре, независимо от используемых процессоров. Один оценочный экземпляр может отслеживать ход выполнения всех арендных договоров и экземпляров в вашем развертывании обработчика канала изменений.

Каждая оценка использует единицы запросов из отслеживаемых и арендных контейнеров. Частота в 1 минуту между хорошей отправной точкой, чем ниже частота, тем выше единиц запроса, потребляемых.

Поддерживаемые режимы потока изменений

Оценка канала изменений может использоваться как для режима последней версии , так и для всех версий и режима удаления. В обоих режимах указанная оценка не гарантирует точное количество невыполненных изменений в процессе.

Дополнительные ресурсы

Следующие шаги

Теперь вы можете узнать больше о обработчике канала изменений в следующей статье: