Обработка потоков данных с помощью Azure Stream Analytics

Azure Cosmos DB
Концентраторы событий Azure
Azure Monitor
Azure Stream Analytics

На схеме эталонной архитектуры представлен сквозной конвейер обработки потоков данных. Конвейер принимает данные из двух источников, сопоставляет записи в двух потоках и вычисляет скользящее среднее за определенный промежуток времени. Результаты сохраняются для дальнейшего анализа.

Архитектура

Схема, показывающая эталонную архитектуру для создания конвейера потоковой обработки с помощью Azure Stream Analytics.

Скачайте файл Visio для этой архитектуры.

Рабочий процесс

Она состоит из следующих компонентов:

Источники данных. В этой архитектуре существует два источника данных, которые создают потоки данных в реальном времени. Первый поток содержит сведения о поездке, а второй — о тарифах. В эталонной архитектуре есть имитированный генератор данных, который считывает данные из набора статических файлов и отправляет данные в Центры событий. В реальном приложении источниками данных будут устройства, установленные в такси.

Центры событий Azure. Центры событий — это служба приема событий. Эта архитектура использует два экземпляра события хаба: по одному на каждый источник данных. Каждый источник данных отправляет поток данных в соответствующий центр событий.

Azure Stream Analytics. Stream Analytics — это модуль обработки событий. Задание Stream Analytics считывает потоки данных из двух центров событий и осуществляет потоковую обработку.

Azure Cosmos DB. Выходные данные задания Stream Analytics — это серия записей, которые записываются в виде документов JSON в базу данных документов Azure Cosmos DB.

Microsoft Power BI. Power BI — набор средств бизнес-аналитики для анализа информации о бизнесе. В этой архитектуре данные загружаются из Azure Cosmos DB. Благодаря этому пользователи могут выполнять анализ полного набора собранных исторических данных. Также результаты можно передать в виде потока прямо из Stream Analytics в Power BI, чтобы просмотреть данные в реальном времени. Дополнительные сведения см. в статье Потоковая передача данных в реальном времени в Power BI.

Azure Monitor. Azure Monitor собирает метрики производительности о службах Azure, развернутых в решении. Отобразив эти данные в визуализации на панели мониторинга, можно получить полезные сведения о работоспособности решения.

Подробности сценария

Сценарий. Компания, предоставляющая услуги такси, собирает данные о каждой поездке в такси. В этом сценарии предполагается, что данные отправляются с двух отдельных устройств. У такси есть счётчик, который отправляет информацию о каждой поездке — длительность, расстояние, а также места посадки и высадки. Отдельное устройство принимает платежи от клиентов и отправляет данные о тарифах. Таксомоторная компания хочет вычислить среднюю сумму чаевых на милю в реальном времени, чтобы выявить тенденции.

Потенциальные варианты использования

Это решение оптимизировано для розничного сценария.

Прием данных

Для имитации источника данных в этой эталонной архитектуре используется набор данных[1]New York City Taxi Data (Данные о поездках в такси в Нью-Йорке). Этот набор данных содержит данные о поездках по такси в Нью-Йорке в течение четырехлетнего периода (2010–2013). Он содержит два типа записей: данные о поездках и данные о тарифах. Данные о поездках включают сведения о продолжительности поездки, расстоянии, а также местах посадки и высадки. Данные о тарифах включают сведения о тарифе, налоге и сумме чаевых. В обоих типах записей есть стандартные поля: номер медальона, лицензия на право вождения и код организации. Вместе эти три поля позволяют уникально идентифицировать такси и водителя. Данные хранятся в формате CSV.

[1] Донован, Брайан; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Иллинойсский университет в Урбане-Шампейне. https://doi.org/10.13012/J8PN93H8

Генератор данных — это приложение .NET, которое считывает записи и отправляет их в Центры событий Azure. Генератор отправляет данные о поездке в формате JSON, а данные о тарифах — в формате CSV.

Для сегментации данных Центры событий используют секции. Они позволяют потребителю читать данные из каждого раздела параллельно. При отправке данных в Центры событий можно явно указать ключ секции. В противном случае записи назначаются секциям методом циклического перебора.

В нашем примере данные о поездках и тарифах должны в итоге иметь одинаковый идентификатор секции для определенного такси. Это позволит Stream Analytics применить определенную степень параллелизма при сопоставлении двух потоков. Запись в секции n с данными о поездке будет соответствовать записи в секции n с данными о тарифах.

Схема потоковой обработки с помощью Azure Stream Analytics и Центров событий Azure

В генераторе данных общая модель данных для обоих типов записей имеет свойство PartitionKey, в котором объединены Medallion, HackLicense и VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Это свойство используется для явного предоставления ключа партиции при отправке данных в Центры событий.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Потоковая обработка

Задание обработки потока определяется с помощью SQL-запроса в несколько отдельных шагов. Первые два шага выбирают записи из двух входных потоков.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

На следующем шаге эти два входных потока объединяются для выбора совпадающих записей из каждого потока.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Этот запрос присоединяет записи к набору полей, которые однозначно определяют соответствующие записи (PartitionId и PickupTime).

Примечание.

Мы хотим, чтобы потоки TaxiRide и TaxiFare были объединены при помощи уникального сочетания Medallion, HackLicense, VendorId и PickupTime. В этом случае PartitionId охватывает поля Medallion, HackLicense и VendorId, но это не следует считать правилом.

В Stream Analytics объединения являются темпоральными, то есть записи объединяются в пределах определенного временного окна. В противном случае задание может бесконечно ожидать сопоставления. Функция DATEDIFF указывает, насколько две совпадающих записи могут быть разделены по времени для сопоставления.

Последний шаг задания вычисляет среднее количество чаевых на милю, сгруппированное в скользящее окно в пять минут.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics предоставляет несколько функций обработки методом окна. Окно с шагом перемещается вперед на фиксированный период, в данном случае на одну минуту за каждый шаг. Результатом является вычисление скользящего среднего за последние пять минут.

В приведенной здесь архитектуре сохраняются только результаты задания Stream Analytics в Azure Cosmos DB. Для сценария с большими данными также следует использовать функцию Event Hubs Capture для сохранения необработанных данных событий в хранилище BLOB-объектов Azure. Сохранение необработанных данных позволит вам в дальнейшем выполнять пакетные запросы к данным журнала, чтобы получить новые полезные сведения из этих данных.

Рекомендации

Эти рекомендации реализуют основные принципы платформы Azure Well-Architected Framework, которая является набором руководящих принципов, которые можно использовать для улучшения качества рабочей нагрузки. Дополнительные сведения см. в статье Microsoft Azure Well-Architected Framework.

Оптимизация затрат

Оптимизация затрат заключается в том, чтобы подумать о способах сокращения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в контрольном списке для проверки конструкции на предмет оптимизации затрат.

Для оценки затрат используйте калькулятор цен Azure. Ниже приведены некоторые рекомендации по службам, используемым в этой эталонной архитектуре.

Azure Stream Analytics

Azure Stream Analytics рассчитывается на основе количества потоковых единиц ($0,11/час), необходимых для обработки данных в рамках службы.

Stream Analytics может быть дорогостоящим, если вы не обрабатываете данные в режиме реального времени или небольших объемах данных. Для этих вариантов использования рекомендуется использовать Функции Azure или Logic Apps для перемещения данных из Центры событий Azure в хранилище данных.

Центры событий Azure и Azure Cosmos DB

Для вопросов стоимости в Центрах событий Azure и Azure Cosmos DB см. справочную архитектуру Stream processing with Azure Databricks.

Операционное превосходство

Операционное превосходство охватывает процессы, которые развертывают приложение и поддерживают его работу в производственной среде. Дополнительные сведения см. в контрольном списке проверки проектирования для операционного превосходства.

Наблюдение

Для любого решения обработки потоков данных очень важно отслеживать производительность и работоспособность системы. Azure Monitor собирает журналы метрик и диагностики для служб Azure, используемых в этой архитектуре. Azure Monitor встроен в платформу Azure и не требует дополнительного кода в приложении.

Любой из перечисленных предупреждающих сигналов указывает на то, что следует масштабировать применимый ресурс Azure.

  • Служба "Центры событий" ограничивает запросы или количество сообщений близко к дневной квоте.
  • Задание Stream Analytics постоянно использует более 80 % выделенных единиц потоковой передачи.
  • Azure Cosmos DB начинает ограничивать частоту запросов.

Эталонная архитектура включает пользовательскую панель мониторинга, которая развертывается на портале Azure. После развертывания архитектуры можно просмотреть панель мониторинга, открыв портал Azure и выбрав TaxiRidesDashboard из списка панелей мониторинга. Дополнительные сведения о создании и развертывании пользовательских панелей мониторинга см. в статье Создание панелей мониторинга Azure программными средствами.

На следующем изображении показана панель мониторинга приблизительно через час после запуска задания Stream Analytics.

Снимок экрана панели мониторинга поездок в такси

На панели в нижнем левом углу показано, что потребление потоковых единиц для задания Stream Analytics повышается в течение первых 15 минут и затем выравнивается. Это типичное развитие событий, когда задача достигает устойчивого состояния.

Обратите внимание на то, что Event Hubs ограничивает запросы, как показано в правом верхнем углу. Случайный регулируемый запрос не является проблемой, так как SDK клиентских библиотек Event Hubs автоматически повторяет при получении ошибки ограничения. Однако если ошибки регулирования повторяются, это означает, что концентратору событий требуется больше единиц пропускной способности. На следующей диаграмме показан тестовый запуск с использованием функции автоинфляции Центров событий, она автоматически масштабирует единицы пропускной способности при необходимости.

Снимок экрана: автомасштабирование Центров событий.

Функция автоматического накачивания включена приблизительно в 06:35. Вы можете заметить снижение производительности в регулируемых запросах, поскольку "Центры событий" автоматически увеличили масштаб до трех единиц пропускной способности.

Интересно, что побочным эффектом стало увеличение использования SU в задании Stream Analytics. При регулировании в службе "Центры событий" искусственно снизилась скорость приема данных для задания Stream Analytics. Довольно часто бывает так, что при устранении одной проблемы с производительностью возникает другая. В таком случае проблему можно решить, выделив дополнительные единицы потоковой передачи для задания Stream Analytics.

DevOps

  • Создайте отдельные группы ресурсов для рабочей среды, сред разработки и тестирования. Отдельные группы ресурсов позволяют проще управлять развертываниями, удалять тестовые развертывания и назначать права доступа.

  • Используйте шаблон Azure Resource Manager для развертывания ресурсов Azure, следуя процессу "инфраструктура как код" (IaC). Благодаря шаблонам автоматизация развертываний с помощью Azure DevOps Services или других решений CI/CD упрощается.

  • Поместите каждую рабочую нагрузку в отдельный шаблон развертывания и сохраняйте ресурсы в системах контроля версий. Вы можете развернуть шаблоны вместе или отдельно в рамках процесса CI/CD, что упрощает процесс автоматизации.

    В этой архитектуре Центры событий Azure, Log Analytics и Azure Cosmos DB определяются как одна рабочая нагрузка. Эти ресурсы включены в один шаблон ARM.

  • Следует рассмотреть возможность стадирования рабочих нагрузок. Развертывайте на различных этапах и выполняйте проверки на каждом этапе перед продолжением к следующему этапу. Таким образом вы можете отправлять обновления в рабочие среды с высокой степенью контроля и свести к минимуму непредвиденные проблемы с развертыванием.

  • Рассмотрите возможность использования Azure Monitor для анализа производительности конвейера потоковой обработки.

Дополнительные сведения см. в разделе о принципах эффективности работы в Microsoft Azure Well-Architected Framework.

Эффективность производительности

Эффективность производительности — это возможность масштабирования рабочей нагрузки в соответствии с требованиями, заданными пользователями. Дополнительные сведения см. в контрольном списке проверки проекта на эффективность производительности.

Центры событий

Пропускная способность Центров событий вычисляется в единицах пропускной способности. Вы можете автоматически масштабировать концентратор событий, включив автоматическое расширение. Это позволит автоматически масштабировать единицы пропускной способности в зависимости от трафика вплоть до заданного максимума.

Потоковая аналитика

В Stream Analytics вычислительные ресурсы, выделенные для задания, измеряются в единицах потоковой передачи. Задания Stream Analytics лучше всего масштабировать, если оно может выполняться параллельно. Таким образом, Stream Analytics сможет распределять задания между несколькими вычислительными узлами.

Для входных данных Центров событий используйте ключевое слово PARTITION BY, чтобы секционировать задание Stream Analytics. Данные будут разделены на подмножества на основе секций Центров событий.

Оконные функции и временные объединения требуют дополнительные SU. По возможности используйте PARTITION BY так, чтобы каждая секция обрабатывалась отдельно. Дополнительные сведения см. в статье Обзор и настройка единиц потоковой передачи.

Если невозможно параллелизовать все задание Stream Analytics, попробуйте разбить его на несколько шагов, начиная с одного или нескольких параллельных шагов. Так первые шаги можно будет выполнить параллельно. Например, как в этой эталонной архитектуре:

  • Шаги 1 и 2 — это SELECT инструкции, которые выбирают записи в одной секции.
  • На шаге 3 выполняется секционированное объединение двух входных потоков. На этом шаге используется преимущество, заключающееся в том, что совпадающие записи имеют один и тот же ключ секции. Поэтому у них всегда будет одинаковый идентификатор секции в каждом входном потоке.
  • На шаге 4 данные агрегируются по всем разделам. Этот шаг нельзя выполнить параллельно.

Используйте схему заданий в Stream Analytics, чтобы просмотреть, сколько секций назначено каждому шагу в задании. Ниже показана схема заданий для этой эталонной архитектуры:

Схема заданий Stream Analytics.

Azure Cosmos DB (облачная база данных)

Емкость пропускной способности для Azure Cosmos DB измеряется в единицах запросов (ЕЗ). Чтобы масштабировать контейнер Azure Cosmos DB свыше 10 000 RU, необходимо указать ключ секции при создании контейнера и включить ключ секции в каждый документ.

В этой эталонной архитектуре новые документы создаются только раз в минуту (интервал "прыгающего" окна), поэтому требования к пропускной способности довольно низкие. По этой причине нет необходимости назначать ключ раздела в этом сценарии.