События
Присоединение к вызову ИИ Навыков
8 апр., 15 - 28 мая, 07
Отточите свои навыки ИИ и введите подметки, чтобы выиграть бесплатный экзамен сертификации
Зарегистрируйтесь!Этот браузер больше не поддерживается.
Выполните обновление до Microsoft Edge, чтобы воспользоваться новейшими функциями, обновлениями для системы безопасности и технической поддержкой.
Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Stream Analytics имеет первоклассную интеграцию с потоками данных Azure в качестве входных данных из четырех видов ресурсов:
Эти ресурсы входных данных могут существовать в той же подписке Azure, что и задание Stream Analytics, или другой подписке.
Stream Analytics поддерживает сжатие для всех источников входных данных. Поддерживаемые типы сжатия: None, Gzip и Deflate. Поддержка сжатия недоступна для эталонных данных. Если входные данные в сжатом формате Avro, Stream Analytics обрабатывает их прозрачно. Вам не нужно указывать тип сжатия с сериализацией Avro.
С помощью портала Azure, Visual Studio и Visual Studio Code можно добавлять входные данные и просматривать или изменять их в задании потоковой передачи. Вы также можете протестировать входные подключения и тестовые запросы, используя примеры данных из портала Azure, Visual Studio и Visual Studio Code. При написании запроса входные данные указываются в предложении FROM. Список доступных входных данных можно получить на странице Запрос на портале. Если вы хотите использовать несколько входных данных, JOIN
их или составьте несколько SELECT
запросов.
Примечание
Настоятельно рекомендуется использовать средства Stream Analytics для Visual Studio Code для лучшей локальной разработки. У средств Stream Analytics для Visual Studio 2019 (версия 2.6.3000.0) имеются известные функциональные пробелы, и они не будут улучшаться в дальнейшем.
Центры событий Azure — это высокомасштабируемая система публикации и подписки для обработки событий. Концентратор событий может обрабатывать миллионы событий в секунду, позволяя вам обрабатывать и анализировать огромное количество данных, создаваемых подключенными устройствами и приложениями. Вместе центры событий и Stream Analytics могут предоставлять комплексное решение для аналитики в режиме реального времени. Центры событий позволяют передавать события в Azure в режиме реального времени, где задания Stream Analytics также обрабатывают эти события в режиме реального времени. Например, в Центры событий можно отправлять сведения о щелчках, показания датчиков или журналы сетевых событий. Затем можно создать задания Stream Analytics для использования центров событий для входных данных для фильтрации, агрегирования и корреляции в режиме реального времени.
EventEnqueuedUtcTime
— это метка времени поступления события в концентратор, которая также является меткой времени по умолчанию для событий, поступающих из Центров событий в Stream Analytics. Для обработки данных в виде потока с помощью метки времени в полезных данных события необходимо использовать ключевое слово TIMESTAMP BY.
Необходимо настроить входные данные каждого концентратора событий, чтобы иметь собственную группу потребителей. Если задание содержит самосоединение или несколько входных данных, некоторые из них могут быть прочитаны несколькими модулями в более поздних стадиях обработки. Эта ситуация влияет на количество читателей в одной потребительской группе. Чтобы не превысить лимит Центров событий в пять читателей на каждую группу потребителей для каждого раздела, рекомендуется назначить отдельную группу потребителей для каждой задачи Stream Analytics. Существует также ограничение в 20 групп потребителей для концентратора событий уровня "Стандартный". Дополнительные сведения см. в разделе об устранении неполадок входных данных Azure Stream Analytics.
В следующей таблице описываются все параметры на странице Новые входные данные на портале Azure для передачи потока входных данных из концентратора событий.
Свойство | Описание |
---|---|
Входной псевдоним | Понятное имя, используемое в запросах задания для ссылки на эти входные данные. |
Подписка | Выберите подписку Azure, в которой существует ресурс концентратора событий. |
Пространство имен концентратора событий | Пространство имен Центров событий — это контейнер для центров событий. При создании концентратора событий также создается пространство имен. |
Имя концентратора событий | Имя концентратора событий для использования в качестве источника входных данных. |
Группа потребителей Event Hub (рекомендуется) | Для каждого задания Stream Analytics рекомендуется использовать отдельную группу получателей. Эта строка идентифицирует группу потребителей, используемую для поглощения данных из концентратора событий. Если группа потребителей не указана, задание Stream Analytics использует группу потребителей $Default . |
Режим проверки подлинности | Укажите тип проверки подлинности, которую вы хотите использовать для подключения к концентратору событий. Для проверки подлинности в концентраторе событий можно использовать строки подключения или управляемое удостоверение. Для параметра управляемого удостоверения можно создать управляемое удостоверение, назначаемое системой, для задания Stream Analytics или управляемого удостоверения, назначаемого пользователем, для проверки подлинности в концентраторе событий. При использовании управляемого удостоверения это удостоверение должно быть членом ролей Azure Event Hubs Data Receiver или Azure Event Hubs Data Owner. |
Имя политики концентратора событий | Политика общего доступа, которая предоставляет доступ к Центрам событий. Каждой политике общего доступа присваивается имя, а также для нее задаются разрешения и ключи доступа. Этот параметр заполняется автоматически, если не выбран вариант указания параметров Центров событий вручную. |
Ключ раздела | Это необязательное поле, доступное только в том случае, если задание настроено на использование уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, вы можете добавить сюда имя этого свойства. Он используется для повышения производительности вашего запроса, если он включает PARTITION BY или предложение GROUP BY для этого свойства. Если это задание использует уровень совместимости 1.2 или выше, это поле по умолчанию используется PartitionId. |
Формат сериализации событий | Формат сериализации (JSON, CSV, Avro, Parquet или прочие (Protobuf, XML, проприетарный...)) входящего потока данных. Убедитесь, что формат JSON соответствует спецификации и не содержит начальный "0" в десятичных числах. |
Кодирование | Сейчас UTF-8 — единственный поддерживаемый формат кодировки. |
Тип сжатия событий | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
Реестр схем (предварительная версия) | Вы можете выбрать реестр схем с схемами для данных событий, полученных из концентратора событий. |
Когда данные поступают из входного потока Центров событий, у вас есть доступ к следующим полям метаданных в запросе Stream Analytics:
Имущество | Описание |
---|---|
EventProcessedUtcTime | Дата и время, когда Stream Analytics обрабатывает событие. |
EventEnqueuedUtcTime | Дата и время получения событий Центрами событий. |
PartitionId | Идентификатор раздела для входного адаптера (нумерация идет от нуля). |
Например, используя эти поля, можно писать запросы, как в следующем примере:
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
Примечание
При использовании Центров событий в качестве конечной точки для маршрутов центра Интернета вещей, вы можете получить доступ к метаданным этого центра с помощью функции GetMetadataPropertyValue.
Центр Интернета вещей — это высокомасштабируемая служба приема данных о событиях публикации и подписки, оптимизированная под сценарии "Интернет вещей".
По умолчанию метка времени событий, поступающих из Центра Интернета вещей в Stream Analytics, — это метка времени поступления события в концентратор Центра Интернета вещей, то есть EventEnqueuedUtcTime
. Для обработки данных в потоке с помощью метки времени в составе полезной нагрузки события необходимо использовать ключевое слово TIMESTAMP BY.
Каждый вход в IoT Hub для Stream Analytics нужно настроить таким образом, чтобы у него была собственная группа потребителей. Если задание включает самосоединение или несколько источников входных данных, некоторые входные данные могут считываться несколькими модулями чтения, находящимися дальше по потоку. Эта ситуация влияет на количество читателей в одной группе потребителей. Чтобы избежать превышения лимита Azure IoT Hub на количество читателей (5 на каждую группу потребителей в разделе), рекомендуется назначить группу потребителей для каждого задания Stream Analytics.
В следующей таблице описываются все параметры на странице Новые входные данные на портале Azure при настройке Центра Интернета вещей в качестве потокового входа.
Свойство | Описание |
---|---|
Входной псевдоним | Удобное имя, используемое в запросах задания для обозначения этих входных данных. |
Подписка | Выберите подписку, в которой существуют ресурсы Центра Интернета вещей. |
Центр IoT | Имя Центра Интернета вещей для использования в качестве источника входных данных. |
Группа потребителей | Для каждого задания Stream Analytics рекомендуется использовать отдельную группу получателей. Группа потребителей используется для приема данных из IoT-хаба. Stream Analytics использует группу получателей "$Default", если не указано иное. |
Имя политики общего доступа | Политика общего доступа, которая предоставляет доступ к Центру Интернета вещей. Каждой политике общего доступа присваивается имя, а также для нее задаются разрешения и ключи доступа. |
Ключ политики общего доступа | Ключ общего доступа, используемый для авторизации доступа к Центру Интернета вещей. Этот параметр автоматически заполняется, если вы не выберете параметр для предоставления параметров Центра Интернета вещей вручную. |
Конечная точка | Конечная точка для Центра Интернета вещей. |
Ключ раздела | Это необязательное поле, доступное только в том случае, если задание настроено на использование уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, вы можете добавить сюда имя этого свойства. Он используется для повышения производительности запроса, если он включает оператор PARTITION BY или GROUP BY для этого атрибута. Если это задание использует уровень совместимости 1.2 или более поздней версии, это поле по умолчанию имеет значение PartitionId. |
Формат сериализации событий | Формат сериализации (JSON, CSV, Avro, Parquet или Другое (Protobuf, XML, собственный...)) входящего потока данных. Убедитесь, что формат JSON соответствует спецификации и не содержит начального "0" в десятичных числах. |
Кодирование | Сейчас UTF-8 — единственный поддерживаемый формат кодировки. |
Тип сжатия событий | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
При использовании данных потока из Центра Интернета вещей вы получаете доступ к следующим полям метаданных в вашем запросе Stream Analytics:
Свойство | Описание |
---|---|
EventProcessedUtcTime | Дата и время обработки события. |
EventEnqueuedUtcTime | Дата и время, когда Центр Интернета вещей (IoT Hub) получает событие. |
PartitionId | Идентификатор раздела с нулевой базой для входного адаптера. |
IoTHub.MessageId | Идентификатор, используемый для корреляции двустороннего обмена данными в Центре Интернета вещей. |
IoTHub.CorrelationId | Идентификатор, используемый в ответах на сообщение и отзывах в Центре Интернета вещей. |
IoTHub.ConnectionDeviceId | Идентификатор проверки подлинности, используемый для отправки этого сообщения. Это значение ставится на сообщения, привязанные к службе, Центром Интернета вещей. |
IoTHub.ConnectionDeviceGenerationId | Идентификатор поколения проверенного устройства, используемого для отправки этого сообщения. Это значение проставляется Центром Интернета вещей в сообщения, связанные с обслуживанием. |
IoTHub.EnqueuedTime | Время, когда Центр Интернета вещей получает сообщение. |
В сценариях с большим количеством неструктурированных данных для хранения в облаке хранилище BLOB-объектов Azure или Azure Data Lake Storage 2-го поколения предлагает экономичное и масштабируемое решение. Данные в хранилище блобов или Azure Data Lake Storage второго поколения считаются данными в состоянии покоя. Однако эти данные можно обрабатывать в виде потока данных Stream Analytics.
Обработка журналов — типичный сценарий для обработки входных данных такого рода с помощью Stream Analytics. В этом сценарии файлы данных телеметрии записываются из системы и должны быть проанализированы и обработаны для извлечения значимых данных.
Метка времени по умолчанию хранилища BLOB-объектов или события Azure Data Lake Storage 2-го поколения в Stream Analytics — это метка времени последнего изменения, т.е. BlobLastModifiedUtcTime
. Если объект blob загружается в учетной записи хранения в 13:00, а задание Azure Stream Analytics запускается с параметром Сейчас в 13:01, то этот объект blob не будет выбран, так как время его изменения не приходится на период выполнения задания.
Если объект blob загружается в контейнер учетной записи хранения в 13:00, а задание Azure Stream Analytics запускается с параметром Пользовательское время в 13:00 или раньше, то объект blob будет обработан, так как время его изменения приходится на период выполнения задания.
Если задание Azure Stream Analytics запущено с помощью Сейчас в 13:00 и большой двоичный объект отправляется в контейнер учетной записи хранения в 13:01, Azure Stream Analytics обнаруживает этот объект. Метка времени, назначенная каждому большому двоичному объекту, основана только на BlobLastModifiedTime
. Папка, в которой находится blob, не имеет отношения к присвоенной метке времени. Например, если BLOB-объект 2019/10-01/00/b1.txt
имеет значение BlobLastModifiedTime
, 2019-11-11
, тогда метка времени, назначенная этому BLOB-объекту, — 2019-11-11
.
Для обработки данных как потока с помощью метки времени в полезных данных события необходимо использовать ключевое слово TIMESTAMP BY. Задание Stream Analytics извлекает данные из хранилища BLOB-объектов Azure или входных данных Azure Data Lake Storage 2-го поколения каждую секунду, если файл BLOB-объекта доступен. Если файл блоба недоступен, применяется экспоненциальная задержка с максимальным интервалом в 90 секунд.
Примечание
Stream Analytics не поддерживает добавление содержимого в существующий файл BLOB. Stream Analytics просматривает каждый файл только один раз, и любые изменения, которые произойдут в нем после того, как задание прочитает данные, не обрабатываются. Мы рекомендуем отправлять все данные для файла большого двоичного объекта за один раз, а затем добавлять более новые события в другой новый файл большого двоичного объекта.
В сценариях, когда многие объекты BLOB постоянно добавляются, и Stream Analytics обрабатывает их по мере добавления, некоторые объекты BLOB могут быть пропущены в редких случаях из-за уровня детализации BlobLastModifiedTime
. Вы можете избежать этой проблемы, отправляя блобы с интервалом как минимум в две секунды. Если этот параметр не подходит, можно использовать центры событий для потоковой передачи больших объемов событий.
В следующей таблице описывается каждое свойство на странице Новые входные данные на портале Azure при настройке Blob-хранилища в качестве потокового ввода.
Свойство | Описание |
---|---|
Псевдоним ввода | Удобочитаемое имя, используемое в запросе задания для ссылки на эти входные данные. |
Подписка | Выберите подписку, в которой есть ресурс хранилища. |
Учетная запись хранения | Имя учетной записи хранения, где находятся blob-файлы. |
Ключ учетной записи хранения | Секретный ключ, связанный с учетной записью хранения. Этот параметр заполняется автоматически, если вы не выберете режим ручной настройки параметров. |
Контейнер | Контейнеры предоставляют логическую группировку для блобов. Чтобы создать новый контейнер, вы можете выбрать параметр Использование имеющихся (контейнеров) или Создать новый. |
Режим проверки подлинности | Укажите тип проверки подлинности, которую вы хотите использовать для подключения к учетной записи хранения. Для аутентификации с помощью учетной записи хранения можно использовать строку подключения или управляемое удостоверение. Для параметра управляемого удостоверения вы можете либо создать управляемое удостоверение, назначаемое системой, для задания Stream Analytics, либо управляемое удостоверение, назначаемое пользователем, для аутентификации с учетной записью хранения. При использовании управляемого удостоверения управляемое удостоверение должно быть членом соответствующей роли в учетной записи хранения. |
Шаблон пути (необязательно) | Путь к файлу, используемый для обнаружения объектов BLOB в указанном контейнере. Если вы хотите считывать объекты BLOB из корня контейнера, не устанавливайте шаблон пути. В пути можно указать один или более экземпляров следующих трех переменных: {date} , {time} или {partition} .Пример 1: cluster1/logs/{date}/{time}/{partition} Пример 2: cluster1/logs/{date} Символ * не является допустимым значением префикса пути. Разрешены только допустимые символы объектов Blob Azure. Не включайте имена контейнеров или имена файлов. |
Формат даты (необязательное свойство) | При использовании переменной даты в пути это формат даты, по которому упорядочены файлы. Пример: YYYY/MM/DD Если входные данные BLOB имеют {date} или {time} в пути, папки рассматриваются в хронологическом порядке. |
Формат времени (необязательное свойство) | Если вы используете временную переменную в пути, это формат времени, в котором организуются файлы. В настоящее время единственным поддерживаемым значением в течение нескольких часов является HH . |
Ключ раздела | Это необязательное поле, доступное только в том случае, если задание настроено на использование уровня совместимости 1.2 или более поздней версии. Если входные данные секционируются по свойству, вы можете добавить сюда имя этого свойства. Он используется для повышения производительности запроса, если он включает предложение PARTITION BY или GROUP BY для этого свойства. Если это задание использует уровень совместимости 1.2 или более поздней версии, это поле по умолчанию имеет значение PartitionId. |
Число входных разделов | Это поле доступно только если в шаблоне пути присутствует раздел {partition}. В качестве значения свойства допускаются целые числа >=1. Все появления {partition} в pathPattern будут заменяться числами от 0 до значения этого поля минус 1. |
Формат сериализации событий | Формат сериализации (JSON, CSV, Avro, Parquet или Другие (Protobuf, XML, собственная...)) входящего потока данных. Убедитесь, что формат JSON соответствует спецификации и не содержит ведущие нули в десятичных числах. |
Кодирование | В настоящее время единственным поддерживаемым форматом кодирования файлов CSV и JSON является UTF-8. |
Сжатие | Тип сжатия, используемый для чтения входящего потока данных, например Нет (по умолчанию), Gzip или Deflate. |
Когда ваши данные поступают из объектного хранилища, в вашем запросе Stream Analytics доступ к следующим полям метаданных:
Свойство | Описание |
---|---|
BlobName | Имя входного BLOB, из которого поступило событие. |
EventProcessedUtcTime | Дата и время, когда Stream Analytics обрабатывает событие. |
Время последнего изменения Blob в UTC | Дата и время последнего изменения объекта BLOB. |
PartitionId | Идентификатор раздела для входного адаптера с нулевой нумерацией. |
Например, используя эти поля, можно писать запросы, как в следующем примере:
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
Azure Stream Analytics позволяет подключаться непосредственно к кластерам Apache Kafka для приема данных. Решение является низким кодом и полностью управляется командой Azure Stream Analytics в Корпорации Майкрософт, что позволяет ему соответствовать стандартам соответствия бизнес-требованиям. Входные данные Kafka являются обратно совместимыми и поддерживают все версии с последним выпуском клиента, начиная с версии 0.10. Пользователи могут подключаться к кластерам Kafka в виртуальной сети и кластерах Kafka с общедоступной конечной точкой в зависимости от конфигураций. Конфигурация зависит от существующих соглашений о конфигурации Kafka. Поддерживаемые типы сжатия: None, Gzip, Snappy, LZ4 и Zstd.
Дополнительные сведения см. в статье "Потоковая передача данных из Kafka" в Azure Stream Analytics (предварительная версия).
События
Присоединение к вызову ИИ Навыков
8 апр., 15 - 28 мая, 07
Отточите свои навыки ИИ и введите подметки, чтобы выиграть бесплатный экзамен сертификации
Зарегистрируйтесь!