Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье показано, как воспользоваться преимуществами параллелизма в Azure Stream Analytics. Узнайте, как масштабировать задания Stream Analytics, настроив входные разделы и осуществив оптимизацию определения аналитического запроса.
В качестве предварительных требований вам может потребоваться ознакомиться с понятием единицы потоковой передачи, описанной в разделе "Общие сведения и настройка единиц потоковой передачи".
Из каких частей состоит задание службы Stream Analytics?
Определение задания Stream Analytics включает как минимум один потоковый вход, запрос и выход. Входные данные — это интерфейсы, откуда задача считывает поток данных. Запрос используется для преобразования потока входных данных, а выходные данные являются точками, куда направляются результаты задания.
Секции во входах и выходах
Секционирование позволяет разделить данные на подмножества на основе ключа секции. Если входные данные (например, Центры событий) секционируются по ключу, рекомендуется указать ключ секции при добавлении входных данных в задание Stream Analytics. Масштабирование задания Stream Analytics использует преимущества секций во входных и выходных данных. Задание Stream Analytics может потреблять и записывать различные разделы параллельно, тем самым повышая пропускную способность.
Входные данные
Все входные данные потоковой передачи Azure Stream Analytics могут воспользоваться преимуществами секционирования: Центры событий, Центр Интернета вещей, хранилище BLOB-объектов, Data Lake Storage 2-го поколения.
Примечание.
Для уровня совместимости 1.2 и более поздних версий задайте ключ секции в качестве входного свойства, не требуя ключевого слова PARTITION BY в запросе. Для уровня совместимости 1.1 и ниже определите ключ секции с ключевым словом PARTITION BY в запросе.
Выходные данные
При работе с Stream Analytics воспользуйтесь преимуществами секционирования в следующих выходных данных:
- Azure Data Lake Storage
- Функции Azure
- Таблица Azure
- Хранилище BLOB (явно задайте ключ партиции)
- Azure Cosmos DB (явно задайте ключ партиции)
- Event Hubs (явно задайте ключ партиции)
- Центр Интернета вещей (явно задайте ключ раздела)
- Cлужебная шина
- SQL и Azure Synapse Analytics с необязательным секционированием: дополнительные сведения см. на странице Вывод в базу данных SQL Azure.
Power BI не поддерживает секционирование. Однако можно по-прежнему разделить входные данные, как описано в этом разделе.
Дополнительные сведения о разделах см. в следующих статьях.
Запрос
Чтобы задание было параллельным, ключи секций должны быть согласованы между всеми входными данными, всеми шагами логики запроса и всеми выходными данными. В логике запроса секционирование определяется ключами, которые используются для соединений и агрегатов (GROUP BY). Последнее требование может быть проигнорировано, если логика запроса не имеет ключа (проекция, фильтры, ссылки...).
- Если входные данные и выходные данные разделяются с помощью
WarehouseId, а запрос группируется поProductIdбезWarehouseId, задание не выполняется параллельно. - Если два входных данных, которые необходимо объединить, секционируются различными ключами секций (
WarehouseIdиProductId), задание не является параллельным. - Если одно задание содержит два или более независимых потоков данных, каждый из которых имеет собственный ключ секции, задание не является параллельным.
Задание выполняется параллельно, только если все входные данные, выходные данные и шаги запроса используют один и тот же ключ.
Задания с легким параллелизмом
Задание с усложненным параллелизмом — это самый масштабируемый сценарий в Azure Stream Analytics. Он соединяет один раздел входных данных с одним экземпляром запроса и одним разделом выходных данных. Такой параллелизм имеет следующие требования:
Если в логике запроса применяется ключ, который обрабатывается тем же экземпляром запроса, необходимо только проследить за тем, чтобы события попадали в ту же секцию входных данных. Для Центров событий или Центра Интернета вещей это означает, что данные события должны иметь значение PartitionKey. Кроме того, можно использовать секционированные отправители. Для хранилища BLOB-объектов это означает, что события отправляются в ту же папку раздела. Примером может служить экземпляр запроса, который агрегирует данные по userID, где входной Центр событий секционирован с использованием userID в качестве ключа секции. Но если логика запроса не требует обрабатывать ключ в том же экземпляре запроса, это требование можно игнорировать. В качестве примера такой логики может быть простой запрос select-project-filter.
Затем сделайте, чтобы запрос был секционирован. Для заданий с уровнем совместимости 1.2 или выше (рекомендуется) укажите настраиваемый столбец в качестве ключа партиционирования в параметрах входных данных, и задание автоматически исполняется в параллельном режиме. Для заданий с уровнем совместимости 1.0 или 1.1 используйте PARTITION BY PartitionId во всех шагах запроса. Можно выполнить несколько шагов, но все они должны быть секционированы одним ключом.
Большинство выходов, поддерживаемых в Stream Analytics, могут использовать преимущества секционирования. Если вы используете выходной тип, который не поддерживает секционирование, задание не тривиально параллельное. Для выходов Event Hubs убедитесь, что в столбце ключа раздела задан тот же ключ раздела, который использовался в запросе. Дополнительные сведения см. в разделе выходных данных.
Число секций входных данных должно совпадать с числом секций выходных данных. Выходные данные хранилища Blob-объектов могут поддерживать разделы и наследуют схему секционирования исходного запроса. При указании ключа секции для хранилища BLOB-объектов данные секционируются по заданному входному ключу секции, поэтому результат по-прежнему полностью параллелен. Примеры значений разделов, которые позволяют выполнять задачу с полностью параллельной обработкой:
- восемь секций входных данных в концентраторах событий и восемь секций выходных данных в концентраторах событий;
- восемь секций входных данных в концентраторах событий и выход в хранилище BLOB-объектов;
- восемь секций входных данных в концентраторах событий и выход в хранилище BLOB-объектов, с секционированием по пользовательскому полю с произвольной кратностью;
- Восемь разделов входных данных хранилища BLOB-объектов и выходные данные хранилища BLOB-объектов.
- восемь секций входных данных в хранилище BLOB-объектов и восемь секций выходных данных в концентраторах событий.
Далее рассмотрим примеры сценариев с усложненным параллелизмом.
Простой запрос
- Входные данные: концентратор событий с восемью секциями
- Выходные данные: концентратор событий с восемью разделами (для использования
PartitionIdнеобходимо задать столбец ключа раздела)
Запрос:
--Using compatibility level 1.2 or above
SELECT TollBoothId
FROM Input1
WHERE TollBoothId > 100
--Using compatibility level 1.0 or 1.1
SELECT TollBoothId
FROM Input1 PARTITION BY PartitionId
WHERE TollBoothId > 100
Этот запрос является простым фильтром. Поэтому вам не нужно беспокоиться о секционирование входных данных, отправляемых в концентратор событий. Обратите внимание, что для удовлетворения требованию 2 задания с уровнем совместимости, предшествующим 1.2, должны содержать предложение PARTITION BY PartitionId. Для выходных данных необходимо настроить выходные данные концентратора событий в задании, чтобы ключ секции был задан как PartitionId. Последняя проверка: число секций входных данных должно быть равно числу секций выходных данных.
Запрос с ключом группирования
- Входные данные: концентратор событий с восемью разделами.
- Выходные данные — хранилище объектов Blob.
Запрос:
--Using compatibility level 1.2 or above
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId
--Using compatibility level 1.0 or 1.1
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
Этот запрос содержит ключ группирования. Поэтому сгруппированные события нужно отправлять в одну секцию концентратора событий. Так как в этом примере вы группируете TollBoothID, убедитесь, что TollBoothID используется в качестве ключа раздела при отправке событий в Центры событий. Затем в Azure Stream Analytics можно использовать PARTITION BY PartitionId , чтобы наследовать от этой схемы секционирования и включить полную параллелизацию. Так как выходные данные являются хранилищем блобов, вам не нужно беспокоиться о настройке значения разделительного ключа, как по требованию #4.
Пример сценариев, которые неудобно параллелизуемы
В предыдущем разделе в статье рассматриваются некоторые неловко параллельные сценарии. В этом разделе вы узнаете о сценариях, которые не соответствуют всем требованиям, чтобы быть неловко параллельными.
Несоответствие числа разделов
- Входные данные: концентратор событий с восемью секциями
- Выходные данные: концентратор событий с 32 разделами
Если число разделов входа не совпадает с числом разделов выхода, топология не является топологией с естественным параллелизмом независимо от запроса. Однако вы по-прежнему можете получить некоторый уровень параллелизации.
Запрос с использованием несекционированных выходных данных
- Входные данные: концентратор событий с восемью секциями
- Выходные данные: Power BI
В настоящее время выходные данные Power BI не поддерживают секционирование. Таким образом этот сценарий не будет чрезвычайно параллельным.
Многоэтапный запрос с разными значениями параметра PARTITION BY
- Входные данные: концентратор событий с восемью разделами.
- Выходные данные: концентратор событий с восемью разделами.
- Уровень совместимости: 1.0 или 1.1
Запрос:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId, PartitionId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1 Partition By TollBoothId
GROUP BY TumblingWindow(minute, 3), TollBoothId
Как видите, на втором этапе в качестве ключа секционирования используется TollBoothId . Этот шаг отличается от первого шага и поэтому требует перестановки.
Многоэтапный запрос с разными значениями параметра PARTITION BY
- Входные данные: концентратор событий с восемью разделами (параметр "Столбец ключа раздела" не задан, используется значение по умолчанию "PartitionId")
- Выходные данные: восемь секций в концентраторе событий (параметр "Столбец ключа секции" должен иметь значение "TollBoothId").
- Уровень совместимости — 1.2 или выше
Запрос:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
Уровень совместимости 1.2 и выше обеспечивает параллельное выполнение запросов по умолчанию. Например, запрос из предыдущего раздела разделяется на секции, когда столбец TollBoothId указан в качестве входного ключа разбиения. Предложение PARTITION BY PartitionId не является обязательным.
Расчет максимального количества единиц потоковой передачи для задания
Общее количество единиц потоковой передачи, которое может использовать задание Stream Analytics, зависит от количества шагов в запросе, определенного для задания, и количества секций для каждого шага.
Шаги в запросе
Запрос может иметь один или несколько шагов. Каждый шаг — это вложенный запрос, определенный с помощью ключевого слова WITH. Запрос за рамками ключевого слова WITH (только один запрос) также учитывается в качестве шага (например, инструкция SELECT в следующем запросе).
Запрос:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute,3), TollBoothId
Этот запрос включает 2 шага.
Примечание.
Этот запрос будет описан далее в этой статье.
Разделение шага
Разделение шага требует наличия следующих условий.
- Источник входных данных должен быть секционирован.
- Инструкция SELECT запроса должна читаться из разделенного источника входных данных.
- Запрос внутри шага должен включать ключевое слово PARTITION BY.
Если запрос разделяется на секции, входные события обрабатываются и агрегируются в отдельные разделенные группы, а выходные события генерируются для каждой из групп. Если требуется объединённая агрегация, необходимо создать второй неразделённый шаг для агрегации.
Рассчитайте максимальное количество единиц потоковой передачи для задания
Все неразделенные шаги объединяются в одну единицу потоковой обработки (SU V2s) для задачи Stream Analytics. Кроме того, можно добавить один SU V2 для каждого раздела в секционированном шаге. В следующей таблице приведены некоторые примеры .
| Запрос | Максимальное количество SU для задания |
|---|---|
|
1 SU V2 |
|
16 SU V2 (1 * 16 секций) |
|
1 SU V2 |
|
4 SU V2s (3 для секционированных шагов + 1 для неразделённых шагов) |
Примеры масштабирования
Следующий запрос вычисляет количество машин, проходящих через пункт взимания платы с тремя пунктами за три минуты. Этот запрос можно масштабировать до одного SU V2.
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
Чтобы использовать дополнительные SUS для запроса, разделите как входной поток данных, так и запрос. Так как для секции потока данных задано значение 3, следующий измененный запрос можно масштабировать до 3 SU V2s:
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
При секционирования запроса входные события обрабатываются и агрегируются в отдельных группах секций. Запрос генерирует выходные события для каждой из групп. Секционирование может привести к непредвиденным результатам, если во входном потоке данных поле GROUP BY не является ключом секции. Для примера предположим, что поле TollBoothId в предыдущем запросе не является ключом секции в Input1. Результатом является то, что данные из TollBooth #1 могут распространяться по нескольким секциям.
Stream Analytics обрабатывает каждую секцию Input1 отдельно. В результате запрос создает несколько записей счета автомобилей для одного и того же пункта взимания платы в одном и том же скользящем окне. Если вы не можете изменить входной ключ секции, исправьте эту проблему, добавив шаг непартийного элемента для агрегирования значений между секциями, как показано в следующем примере:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
Этот запрос можно масштабировать до 4 единиц SU V2.
Примечание.
При объединении двух потоков убедитесь, что потоки распределяются по ключу разбиения столбца, который используется для создания соединений. Также убедитесь, что количество разделов в обоих потоках одинаковое.
Достижение более высокой пропускной способности в большом масштабе
Задание с усложненным параллелизмом является необходимым, но недостаточным для обеспечения высокой пропускной способности в большом масштабе. В каждой системе хранения и в соответствующем ей выходе Stream Analytics есть разные возможности, позволяющие добиться наилучшей пропускной способности записи. Как и в любом масштабируемом сценарии, некоторые проблемы требуют правильных конфигураций для решения. В этом разделе рассматриваются конфигурации для нескольких распространенных выходов и приведены примеры для обеспечения скорости обработки 1 тыс., 5 тыс. и 10 тыс. событий в секунду.
В следующих наблюдениях используется задание Stream Analytics с запросом без отслеживания состояния (сквозная передача), базовой пользовательской функцией JavaScript (UDF), которая записывает данные в Центры событий, Azure SQL или Azure Cosmos DB.
Event Hubs
| Скорость приема (событий в секунду) | Единицы потоковой передачи | Выходные ресурсы |
|---|---|---|
| 1 К | 1/3 | 2 ТУ |
| 5000 | 1 | 6 ТЕ |
| 10 тыс. | 2 | 10 единиц пропускной способности |
Решение на основе службы Центры событий позволяет линейно масштабировать единицы потоковой передачи (SU) и пропускную способность, что делает его наиболее эффективным и производительным способом для анализа данных и потоковой передачи данных из Stream Analytics. Вы можете масштабировать задания до 66 SU V2s, что соответствует обработке до 400 МБ/с или 38 трлн событий в день.
Azure SQL
| Скорость приема (событий в секунду) | Единицы потоковой передачи | Выходные ресурсы |
|---|---|---|
| 1 К | 2/3 | S3 |
| 5000 | 3 | P4 |
| 10 тыс. | 6 | P6 |
Azure SQL поддерживает параллельную запись, называемую наследованием секционирования, но по умолчанию она отключена. Однако включение наследуемого секционирования вместе с полностью параллельным запросом может быть недостаточно для достижения более высокой производительности. Пропускная способность записи SQL в значительной степени зависит от конфигурации базы данных и схемы таблицы. Более подробную информацию о параметрах, которые могут максимально увеличить пропускную способность записи, см. в статье о производительности вывода SQL. Как отмечается в статье о выходных данных Azure Stream Analytics в базу данных Azure SQL, это решение не масштабируется линейно как полностью параллельный конвейер за пределами 8 разделов и может потребоваться повторное разбиение перед выводом в SQL (см. INTO). Для поддержания высоких скоростей ввода-вывода и учета нагрузки от резервного копирования журналов, происходящего каждые несколько минут, необходимы SKU категории "Премиум".
Azure Cosmos DB
| Скорость приема (событий в секунду) | Единицы потоковой передачи | Выходные ресурсы |
|---|---|---|
| 1 К | 2/3 | 20 К RU |
| 5000 | 4 | 60 тыс. руб. |
| 10 тыс. | 8 | 120 K ЕЗ |
Azure Cosmos DB выходные данные Stream Analytics обновляются, чтобы использовать нативную интеграцию в уровень совместимости 1.2. Уровень совместимости 1.2 обеспечивает значительно более высокую пропускную способность и сокращает потребление ЕЗ по сравнению с уровнем 1.1, который является уровнем совместимости по умолчанию для новых заданий. Решение использует контейнеры Azure Cosmos DB, секционированные на /deviceId, и остальные решения настроены одинаково.
Во всех примерах потоковой передачи в большом масштабе для Azure в качестве входного ресурса используется Центр событий, в который поступает нагрузка от эмулированных тестовых клиентов. Каждое входное событие — это документ JSON размером 1 КБ, который легко преобразует настроенные частоты приема в пропускную способность (1 МБ/с, 5 МБ/с и 10 МБ/с). События имитируют устройство Интернета вещей, отправляющее следующие данные JSON (в сокращенной форме) для до 1000 устройств:
{
"eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
"complexData": {
"moreData0": 51.3068118685458,
"moreData22": 45.34076957651598
},
"value": 49.02278128887753,
"deviceId": "contoso://device-id-1554",
"type": "CO2",
"createdAt": "2019-05-16T17:16:40.000003Z"
}
Примечание.
Поскольку в решениях могут использоваться различные компоненты, конфигурации могут быть изменены. Чтобы получить более точную оценку, настройте примеры в соответствии с имеющимся сценарием.
Определение узких мест
Для выявления узких мест в конвейере можно воспользоваться панелью "Метрики" в задании Azure Stream Analytics. Просмотрите параметры События ввода и вывода с данными о пропускной способности и Предельная задержка или Отложенные события, чтобы узнать, выполняется ли задание в соответствии с входной скоростью. Чтобы просмотреть метрики Центров событий, найдите раздел Ограниченные запросы и соответствующим образом измените пороговые единицы. Просмотрите максимальную потребляемую скорость (RU/с) на диапазон ключей секции в разделе "Пропускная способность" для обеспечения равномерного распределения нагрузки по всем диапазонам ключей секции. Для Azure SQL DB отслеживайте логический ввод-вывод и ЦП.
Получить помощь
Дополнительные сведения см. на странице вопросов Microsoft Q&A для Azure Stream Analytics.