Настройка источника потока данных в операциях Интернета вещей Azure

Источник заключается в том, где данные входят в поток данных или граф потока данных. Вы настраиваете источник, указав ссылку на конечную точку и список источников данных (разделов) для этой конечной точки.

Подсказка

Один источник потока данных может подписываться на несколько разделов MQTT или Kafka одновременно. Вам не нужно создавать отдельные потоки данных для каждого раздела. dataSources Используйте поле (или Темы)>Добавить строку в интерфейсе операций, чтобы добавить несколько фильтров тем, включая подстановочные знаки. Дополнительные сведения см. в разделе "Подписка на несколько разделов".

Эта страница относится как к потокам данных , так и к графам потока данных. Для потоков данных источник является операцией в ресурсе Dataflow . Для графов потока данных источник является Source узлом в ресурсе DataflowGraph .

Это важно

Потоки данных поддерживают исходные конечные точки MQTT и Kafka. Графы потока данных поддерживают конечные точки источника MQTT, Kafka и OpenTelemetry. Каждый поток данных должен иметь локальную конечную точку брокера MQTT в Azure IoT Operations по умолчанию в качестве источника или назначения. Дополнительные сведения см. в статье "Потоки данных" должны использовать локальную конечную точку брокера MQTT.

В качестве источника можно использовать один из следующих вариантов.

Использование конечной точки по умолчанию

  1. В разделе "Исходные сведения" выберите конечную точку потока данных.

    Скриншот интерфейса для управления операциями, показывающий выбор брокера сообщений в качестве исходной конечной точки для потока данных.

  2. Введите следующие параметры для источника брокера сообщений:

    Setting Описание
    Конечная точка потока данных Выберите по умолчанию, чтобы использовать стандартную конечную точку брокера сообщений MQTT.
    Тема Фильтр раздела для подписки на входящие сообщения. Используйте Тему(ы)>Добавить строку, чтобы добавить несколько тем. Дополнительные сведения о разделах см. в разделе "Подписка на несколько разделов".
    Схема сообщений Схема, используемая для десериализации входящих сообщений. См. раздел "Указание схемы для десериализации данных".
  3. Нажмите кнопку "Применить".

Так как dataSources принимает разделы MQTT или Kafka без изменения конфигурации конечной точки, можно повторно использовать конечную точку для нескольких потоков данных, даже если разделы отличаются. Дополнительные сведения см. в разделе "Подписка на несколько разделов".

Использование ресурса в качестве источника

Вы можете использовать asset в качестве источника для потока данных. Вы можете использовать ресурс в качестве источника только в оперативной среде.

  1. В разделе "Исходные сведения" выберите "Ресурс".

  2. Выберите ресурс, который вы хотите использовать в качестве исходной конечной точки.

  3. Нажмите кнопку "Продолжить".

    Отображается список точек данных для выбранного ресурса.

    Снимок экрана: использование опыта работы с операциями для выбора актива в качестве конечной точки.

  4. Выберите "Применить" , чтобы использовать ресурс в качестве исходной конечной точки.

При использовании ресурса в качестве источника определение ресурса предоставляет схему потока данных. Определение ресурса включает схему для точек данных ресурса. Дополнительные сведения см. в статье "Удаленное управление конфигурациями активов".

Как только вы настроите источник, данные из данного актива поступают в поток данных через локальный брокер MQTT. Поэтому, когда вы используете ресурс в качестве источника, поток данных использует локальную конечную точку брокера MQTT по умолчанию в качестве источника.

Используйте собственную конечную точку MQTT или Kafka

Если вы создали пользовательскую конечную точку потока данных MQTT или Kafka (например, для использования с сеткой событий или Центрами событий), ее можно использовать в качестве источника потока данных. Помните, что конечные точки типа хранилища, такие как Data Lake или Fabric OneLake, нельзя использовать в качестве источника.

  1. В разделе "Исходные сведения" выберите конечную точку потока данных.

    Снимок экрана: использование опыта работы с операциями для выбора пользовательского брокера сообщений в качестве исходной конечной точки.

  2. Введите следующие параметры для источника брокера сообщений:

    Setting Описание
    Конечная точка потока данных Нажмите кнопку Reselect, чтобы выбрать пользовательскую конечную точку MQTT или Kafka data flow. Дополнительные сведения см. в статьях Настройка конечных точек для потоков данных MQTT или Настройка конечных точек для потоков данных Azure Event Hubs и Kafka.
    Тема Фильтр раздела для подписки на входящие сообщения. Используйте Тему(ы)>Добавить строку, чтобы добавить несколько тем. Дополнительные сведения о разделах см. в разделе "Подписка на несколько разделов".
    Схема сообщений Схема, используемая для десериализации входящих сообщений. См. раздел "Указание схемы для десериализации данных".
  3. Нажмите кнопку "Применить".

Подписка на несколько разделов

Можно указать несколько разделов MQTT или Kafka в источнике, не изменив конфигурацию конечной точки потока данных. Эта гибкость означает, что вы можете повторно использовать одну конечную точку в нескольких потоках данных, даже если разделы различаются. Дополнительные сведения см. в разделе Повторное использование конечных точек потока данных.

Подстановочные знаки раздела MQTT

Если источником является конечная точка MQTT (включенная сетка событий), используйте фильтр раздела MQTT для подписки на входящие сообщения. Фильтр разделов может включать подстановочные знаки для подписки на несколько разделов. Например, thermostats/+/sensor/temperature/# подписывается на все сообщения датчика температуры из термостатов. Чтобы настроить фильтры раздела MQTT, выполните следующие действия.

В сведениях о источнике потока данных для операций выберите конечную точку потока данных, а затем используйте поле "Разделы" для указания фильтров разделов MQTT для подписки на входящие сообщения. Чтобы добавить несколько разделов MQTT, выберите "Добавить строку " и введите новый раздел.

Снимок экрана: интерфейс интерфейса операций с несколькими фильтрами раздела MQTT, настроенными в исходных сведениях для потока данных.

Общие подписки

Чтобы использовать общие подписки с источниками сообщений брокера, укажите тему общей подписки в виде $shared/<GROUP_NAME>/<TOPIC_FILTER>.

В данных о потоке операций сведения об источнике, выберите конечную точку потока данных и используйте поле Тема, чтобы указать общую группу подписок и тему.

Если число экземпляров в профиле потока данных больше одного, общая подписка автоматически включается для всех потоков данных, использующих источник брокера сообщений. В этом случае добавляется префикс $shared, и автоматически создаётся имя общей группы подписок. Например, если у вас есть профиль потока данных с количеством экземпляров 3, и ваш поток данных использует конечную точку брокера сообщений в качестве источника, настроенную с топиками topic1 и topic2, они автоматически преобразуются в общие подписки как $shared/<GENERATED_GROUP_NAME>/topic1 и $shared/<GENERATED_GROUP_NAME>/topic2.

Вы можете явно создать раздел с именем $shared/mygroup/topic в конфигурации. Однако явное добавление $shared раздела не рекомендуется, так как $shared префикс автоматически добавляется при необходимости. Потоки данных могут выполнять оптимизацию с именем группы, если она не задана. Например, если $shared не задано, потоки данных должны работать только над именем раздела.

Это важно

Общие подписки важны для потоков данных, если число экземпляров больше одного и вы используете MQTT брокер Event Grid в качестве источника, так как он не поддерживает общие подписки. Чтобы избежать пропущенных сообщений, задайте для потока данных число экземпляров профиля равным единице при использовании брокера сетки событий MQTT в качестве источника. Это происходит, когда поток данных выступает в роли подписчика и получает сообщения из облака.

Темы Kafka

Если источником является конечная точка Kafka (включая Event Hubs), укажите отдельные топики Kafka, на которые нужно подписаться для получения входящих сообщений. Подстановочные знаки не поддерживаются, поэтому необходимо указать каждый раздел статически.

Замечание

При работе с узлами событий через конечную точку Kafka каждый отдельный узел событий в пространстве имен является темой Kafka. Например, если у вас есть пространство имен Центров событий с двумя концентраторами событий, thermostats и humidifiers, то можно указать каждый концентратор событий в качестве темы Kafka.

Чтобы настроить разделы Kafka, выполните следующие действия.

В сведениях об источнике потока данных для операций выберите конечную точку потока данных, а затем используйте поле Тема, чтобы указать фильтр темы Kafka для подписки на входящие сообщения.

Замечание

В интерфейсе операций можно указать только один фильтр тем. Чтобы использовать несколько фильтров разделов, используйте Bicep или Kubernetes.

Используйте исходную тему в пути назначения

При подписке на несколько разделов с подстановочными знаками можно использовать исходный раздел в качестве переменной в пути назначения. Эта функция работает как с потоками данных, так и с графами потока данных.

Используйте ${inputTopic} для полного исходного раздела или ${inputTopic.N} для извлечения определенного сегмента (1 индексированного). Например, если вы подписаны на factory/+/telemetry/#, сообщение, поступающее на factory/line1/telemetry/temp, можно перенаправить в целевой раздел, например processed/${inputTopic.2}/data, который разворачивается в processed/line1/data.

Для получения полной информации и примеров см. динамические темы назначения.

Указание исходной схемы

При использовании MQTT или Kafka в качестве источника можно указать схему для отображения списка точек данных в пользовательском веб-интерфейсе операций. Использование схемы для десериализации и проверки входящих сообщений в настоящее время не поддерживается.

Если источник является ресурсом, портал автоматически выводит схему из определения ресурса.

Подсказка

Чтобы создать схему на основе примера файла данных, используйте помощник генерации схемы Schema Gen Helper.

Чтобы настроить схему, используемую для десериализации входящих сообщений из источника:

В разделе "Сведения о источнике потока данных операций" выберите конечную точку потока данных и используйте поле схемы сообщения для указания схемы. Выберите "Отправить ", чтобы отправить файл схемы. Дополнительные сведения см. в разделе "Общие сведения о схемах сообщений".

Дополнительные сведения см. в разделе "Общие сведения о схемах сообщений".

Дальнейшие действия