Data flow — это путь, который данные принимают из источника в место назначения с необязательными преобразованиями. Вы можете настроить поток данных, создав настраиваемый ресурс Data flow или используя веб-интерфейс рабочего процесса. Поток данных состоит из трех частей: источник, трансформация и назначение.
flowchart LR
subgraph Source
A[DataflowEndpoint]
end
subgraph BuiltInTransformation
direction LR
Datasets - -> Filter
Filter - -> Map
end
subgraph Destination
B[DataflowEndpoint]
end
Source - -> BuiltInTransformation
BuiltInTransformation - -> Destination
Чтобы определить источник и назначение, необходимо настроить конечные точки потока данных. Преобразование является необязательным и может включать такие операции, как обогащение данных, фильтрация данных и сопоставление данных с другим полем.
Вы можете использовать опыт работы с операциями в Azure IoT Operations для создания потока данных. Опыт работы с операциями предоставляет визуальный интерфейс для настройки потока данных. Вы также можете использовать Bicep для создания потока данных с помощью Bicep-файла или Kubernetes для создания потока данных с помощью YAML-файла.
Продолжайте чтение, чтобы узнать, как настроить источник, преобразование и назначение.
Предварительные условия
Потоки данных можно развернуть, как только у вас есть экземпляр Azure IoT Operations, используя профиль потока данных и конечную точку по умолчанию. Однако может потребоваться настроить профили потоков данных и конечные точки, чтобы кастомизировать поток данных.
профиль потока данных
Если для потоков данных не нужны разные параметры масштабирования, используйте профиль default data flow предоставленный Azure IoT Operations. Избегайте связывания слишком большого количества потоков данных с одним профилем потока данных. Если у вас большое количество потоков данных, распределите их по нескольким профилям потоков данных, чтобы снизить риск превышения размера конфигурации профиля потока данных, который составляет 70.
Сведения о том, как настроить новый профиль потока данных, см. в разделе Configure data flow profiles.
конечные точки потока данных
Для настройки источника и назначения потока данных необходимы конечные точки потока данных. Чтобы быстро начать, используйте конечную точку передачи данных по умолчанию для локального MQTT брокера. Вы также можете создавать другие типы конечных точек data flow, таких как Kafka, Центры событий, OpenTelemetry или Azure Data Lake Storage. Дополнительные сведения см. в разделе Настройка конечных точек потоков данных.
Начать
Когда у вас есть предварительные условия, можно начать создание потока данных.
Чтобы создать поток данных в интерфейсе операций, выберите Поток данных>Создать поток данных.
Выберите плейсхолдер с именем new-data-flow, чтобы задать свойства потока данных. Введите имя потока данных и выберите профиль потока данных для использования. По умолчанию выбран профиль потока данных. Дополнительные сведения о профилях data flow см. в разделе Configure data flow profile.
Внимание
При создании потока данных можно выбрать только профиль потока данных. После создания потока данных невозможно изменить профиль потока данных.
Если вы хотите изменить профиль существующего потока данных, удалите исходный поток данных и создайте новый с новым профилем потока данных.
Настройте исходную, преобразованную и конечную точки подключения для потока данных, выбрав элементы на схеме потока данных.
Используйте команду az iot ops dataflow apply для создания или изменения потока данных.
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
Параметр --config-file — это путь и имя файла файла конфигурации JSON, содержащего свойства ресурса.
В этом примере предположим, что файл data-flow.json конфигурации с следующим содержимым хранится в домашнем каталоге пользователя:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
// See source configuration section
}
},
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
// See transformation configuration section
}
},
{
"operationType": "Destination",
"destinationSettings": {
// See destination configuration section
}
}
]
}
Ниже приведен пример команды для создания или обновления потока данных с помощью профиля потока данных по умолчанию.
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
Создайте файл Bicep .bicep, чтобы начать создавать поток данных. В этом примере показана структура потока данных, содержащая конфигурации источника, преобразования и назначения.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// See source configuration section
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// See transformation configuration section
}
}
{
operationType: 'Destination'
destinationSettings: {
// See destination configuration section
}
}
]
}
}
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
Создайте манифест Kubernetes .yaml, чтобы начать создание потока данных. В этом примере показана структура потока данных, содержащая конфигурации источника, преобразования и назначения.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: <DATAFLOW_NAME>
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
# This field is required when configuring via Kubernetes YAML
# The syntax is different when using Bicep
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
# Transformation optional
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
Изучите следующие разделы, чтобы узнать, как настроить типы операций потоков данных.
Источник
Настройте исходную конечную точку и источники данных (разделы) для потока данных. Вы можете использовать брокер MQTT по умолчанию, ресурс или пользовательскую конечную точку MQTT или Kafka в качестве источника.
Полные сведения о конфигурации, включая подстановочные знаки раздела MQTT, общие подписки, разделы Kafka и исходную схему, см. в разделе "Настройка источника потока данных".
Если в качестве источника не используется конечная точка по умолчанию, ее необходимо использовать в качестве назначения. Дополнительные сведения об использовании локальной конечной точки брокера MQTT см. в разделе "Потоки данных" должны использовать локальную конечную точку брокера MQTT.
Запрос сохраняемости диска
Сохраняемость диска сохраняет состояние обработки потока данных во время перезапуска. Дополнительные сведения о конфигурации см. в разделе "Настройка сохраняемости диска".
Операция преобразования заключается в преобразовании данных из источника перед отправкой его в место назначения. Преобразования являются необязательными. Если вам не нужно вносить изменения в данные, не включите операцию преобразования в конфигурацию data flow. Несколько преобразований связываются друг с другом на этапе выполнения, независимо от порядка их указания в конфигурации. Порядок этапов всегда:
-
Обогащение: добавьте дополнительные данные в исходные данные с заданным набором данных и условием для сопоставления.
-
Фильтр. Фильтрация данных на основе условия.
-
Сопоставление, вычисление, переименование или добавление нового свойства: перемещение данных из одного поля в другое с необязательным преобразованием.
В этом разделе представлено введение в преобразования потоков данных. Дополнительные сведения см. в разделе "Сопоставление данных с помощью потоков данных" и "Обогащение данных" с помощью потоков данных.
В интерфейсе операций выберите Data flow>Add transform (необязательно).
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
// See section on enriching data
],
"filter": [
// See section on filtering data
],
"map": [
// See section on mapping data
]
}
}
builtInTransformationSettings: {
datasets: [
// See section on enriching data
]
filter: [
// See section on filtering data
]
map: [
// See section on mapping data
]
}
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
builtInTransformationSettings:
datasets:
# See section on enriching data
filter:
# See section on filtering data
map:
# See section on mapping data
Обогащение: добавление ссылочных данных
Чтобы обогатить данные, сначала добавьте ссылочный набор данных в хранилище Azure IoT Operations state. Набор данных добавляет дополнительные данные в исходные данные на основе условия. Условие указывается в качестве поля в исходных данных, которые соответствуют полю в наборе данных.
Примеры данных можно загрузить в хранилище состояний с помощью state store CLI. Имена ключей в хранилище состояний соответствуют набору данных в конфигурации data flow.
В настоящее время этап обогащения не поддерживается в интерфейсе операций.
Чтобы обогатить данные, используйте свойство builtInTransformationSettings в конфигурации data flow.
datasets Используйте свойство, чтобы указать наборы данных для обогащения.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
{
"key": "<DATASET_KEY>",
"inputs": [
"$source.<SOURCE_FIELD>" // ---------------- $1
"$context(<DATASET_KEY>).<DATASET_FIELD>" // - $2
],
"expression": "$1 == $2"
}
]
}
}
В этом примере показано, как использовать deviceId поле в исходных данных для сопоставления asset поля в наборе данных:
builtInTransformationSettings: {
datasets: [
{
key: 'assetDataset'
inputs: [
'$source.deviceId' // ---------------- $1
'$context(assetDataset).asset' // ---- $2
]
expression: '$1 == $2'
}
]
}
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
Например, можно использовать deviceId поле в исходных данных для сопоставления asset поля в наборе данных:
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
Если набор данных содержит запись с asset полем, аналогично:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
Данные из источника, в котором поле deviceId соответствует thermostat1, имеют поля location и manufacturer, доступные на этапах фильтрации и отображения.
Дополнительные сведения о синтаксисе условия см. в разделе "Обогащение данных с помощью потоков данных"
Фильтр: фильтрация данных на основе условия
Используйте этап фильтра, чтобы удалить сообщения, которые не соответствуют условию. Можно определить несколько правил фильтра с полями ввода и логическими выражениями.
Полные сведения о конфигурации и примеры см. в разделе "Фильтрация данных" в потоке данных.
Карта: перемещение данных из одного поля в другое
Чтобы сопоставить данные с другим полем, при необходимости преобразовать их, используйте операцию map. Укажите преобразование в виде формулы, которая использует поля в исходных данных.
В интерфейсе операций можно сопоставить данные с помощью преобразований Compute, переименования и нового свойства.
Вычислить
Используйте преобразование вычислений для применения формулы к исходным данным. Эта операция применяет формулу к исходным данным и сохраняет результат в поле.
В разделе «Преобразование» (необязательно) выберите «Вычисления»>«Добавить».
Введите необходимые параметры.
| Настройка |
Описание |
| Выбор формулы |
Выберите существующую формулу из раскрывающегося списка или выберите "Пользователь" , чтобы ввести формулу вручную. |
| Выходные данные |
Укажите имя для отображения результата. |
| Формула |
Введите формулу, применяемую к исходным данным. |
| Описание |
Укажите описание преобразования. |
| Последнее известное значение |
При необходимости используйте последнее известное значение, если текущее значение недоступно. |
Введите или измените формулу в поле "Формула ". Формула может использовать поля в исходных данных. Введите @ или нажмите клавиши CTRL + ПРОБЕЛ, чтобы выбрать точки данных из выпадающего списка. Для встроенных формул выберите <dataflow> плейсхолдер, чтобы просмотреть список доступных точек информации.
Введите свойства метаданных MQTT с помощью формата @$metadata.user_properties.<property> или @$metadata.topic. Введите заголовки $metadata с помощью формата @$metadata.<header>. Синтаксис $metadata необходим только для свойств MQTT, входящих в заголовок сообщения. Для получения дополнительной информации см. справочную информацию о полях.
Формула может использовать поля в исходных данных. Например, можно использовать temperature поле в исходных данных для преобразования температуры в Цельсию и хранения его в temperatureCelsius поле вывода.
Выберите Применить.
Переименовать
Используйте преобразование "Переименовать ", чтобы переименовать точку данных. Эта операция переименовывает точку данных в исходных данных в новое имя. Используйте новое имя на последующих этапах потока данных.
В разделе "Преобразование( необязательно)" выберите "Переименовать>добавить".
Введите необходимые параметры.
| Настройка |
Описание |
| Точка данных |
Выберите точку данных из раскрывающегося списка или введите заголовок $metadata. |
| Новое имя точки данных |
Введите новое имя точки данных. |
| Описание |
Укажите описание преобразования. |
Введите свойства метаданных MQTT с помощью формата @$metadata.user_properties.<property> или @$metadata.topic. Введите заголовки $metadata с помощью формата @$metadata.<header>. Синтаксис $metadata необходим только для свойств MQTT, входящих в заголовок сообщения. Для получения дополнительной информации см. справочную информацию о полях.
Выберите Применить.
Новое свойство
Используйте преобразование нового свойства , чтобы добавить новое свойство в исходные данные. Эта операция добавляет новое свойство в исходные данные. Используйте новое свойство на последующих этапах потока данных.
В разделе "Преобразование" (необязательно) выберите "Добавить новое свойство>".
Введите необходимые параметры.
| Настройка |
Описание |
| Ключ свойства |
Введите ключ для нового свойства. |
| Значение свойства |
Введите значение нового свойства. |
| Описание |
Укажите описание нового свойства. |
Выберите Применить.
Например, можно использовать temperature поле в исходных данных для преобразования температуры в Цельсию и хранения его в temperatureCelsius поле. Обогатите исходные данные с полем location из набора данных контекстуализации.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"output": "temperatureCelsius",
"expression": "($1 - 32) * 5/9"
},
{
"inputs": [
"$context(assetDataset).location" // - $2
],
"output": "location"
}
]
}
}
Вы можете получить доступ к свойствам метаданных MQTT, используя формат $metadata.user_properties.<property> или $metadata.topic. Вы также можете ввести заголовки $metadata с помощью формата $metadata.<header>. Для получения дополнительной информации см. справочную информацию о полях.
Например, можно использовать temperature поле в исходных данных для преобразования температуры в Цельсию и хранения его в temperatureCelsius поле. Обогатите исходные данные с полем location из набора данных контекстуализации.
builtInTransformationSettings: {
map: [
{
inputs: [
'temperature'
]
output: 'temperatureCelsius'
expression: '($1 - 32) * 5/9'
}
{
inputs: [
'$context(assetDataset).location'
]
output: 'location'
}
]
}
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
Вы можете получить доступ к свойствам метаданных MQTT, используя формат $metadata.user_properties.<property> или $metadata.topic. Вы также можете ввести заголовки $metadata с помощью формата $metadata.<header>. Для получения дополнительной информации см. справочную информацию о полях.
Например, можно использовать temperature поле в исходных данных для преобразования температуры в Цельсию и хранения его в temperatureCelsius поле. Обогатите исходные данные с полем location из набора данных контекстуализации.
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
expression: "($1 - 32) * 5/9"
output: temperatureCelsius
- inputs:
- $context(assetDataset).location
output: location
Дополнительные сведения см. в разделе "Сопоставление данных" с помощью потоков данных.
Удалить
По умолчанию схема вывода включает все точки данных. Удалите любую точку данных из назначения с помощью преобразования Remove .
В разделе "Преобразование( необязательно)" нажмите кнопку "Удалить".
Выберите точку данных, чтобы удалить из выходной схемы.
Выберите Применить.
Чтобы удалить точку данных из выходной схемы, используйте свойство builtInTransformationSettings в конфигурации data flow.
map Используйте свойство, чтобы указать точки данных для удаления.
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"*"
],
"output": "*"
},
{
"inputs": [
"weight"
],
"output": ""
}
{
"inputs": [
"weight.SourceTimestamp"
],
"output": ""
},
{
"inputs": [
"weight.Value"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Code"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Symbol"
],
"output": ""
}
]
}
}
builtInTransformationSettings: {
map: [
{
inputs: [
'*'
]
output: '*'
}
{
inputs: [
'weight'
]
output: ''
}
{
inputs: [
'weight.SourceTimestamp'
]
output: ''
}
{
inputs: [
'weight.Value'
]
output: ''
}
{
inputs: [
'weight.StatusCode'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Code'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Symbol'
]
output: ''
}
]
}
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
builtInTransformationSettings:
map:
- type: PassThrough
inputs:
- "*"
output: "*"
- inputs:
- weight
output: ""
- inputs:
- weight.SourceTimestamp
output: ""
- inputs:
- weight.Value
output: ""
- inputs:
- weight.StatusCode
output: ""
- inputs:
- weight.StatusCode.Code
output: ""
- inputs:
- weight.StatusCode.Symbol
output: ""
Дополнительные сведения см. в разделе "Сопоставление данных" с помощью потоков данных.
Сериализация данных в соответствии со схемой
Если необходимо сериализовать данные перед отправкой в место назначения, укажите схему и формат сериализации. Дополнительные сведения см. в разделе Сериализация выходных данных с помощью схемы.
Назначение
Настройте целевую конечную точку и назначение данных (раздел, контейнер или таблицу) для потока данных. В качестве назначения можно использовать любой поддерживаемый тип конечной точки, включая MQTT, Kafka, Azure Data Lake Storage, Microsoft Fabric, Azure Data Explorer и локальное хранилище.
Полные сведения о конфигурации, включая таблицу назначения данных, разделы динамического назначения и сериализацию выходных данных, см. в разделе "Настройка назначения потока данных".
Чтобы отправить данные в место назначения, отличное от локального брокера MQTT, создайте конечную точку потока данных. Дополнительные сведения см. в разделе Настройка конечных точек потока данных.
Внимание
Конечные точки Storage требуют схему для сериализации. Чтобы использовать поток данных с Microsoft Fabric OneLake, Azure Data Lake Storage, Azure Data Explorer или локального хранилища, необходимо указать ссылку на схему.
Пример
В следующем примере показана конфигурация потока данных, которая использует конечную точку MQTT для источника и назначения. Источник фильтрует данные из раздела MQTT azure-iot-operations/data/thermostat. Преобразование переводит температуру в Фаренгейт и фильтрует данные, где произведение температуры на влажность меньше 100000. Назначение отправляет данные в топик MQTT factory.
Используйте команду az iot ops dataflow apply для создания или изменения потока данных.
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
Параметр --config-file — это путь и имя файла файла конфигурации JSON, содержащего свойства ресурса.
В этом примере предположим, что файл data-flow.json конфигурации с следующим содержимым хранится в домашнем каталоге пользователя:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"builtInTransformationSettings": {
"datasets": [],
"filter": [
{
"expression": "$1 * $2 < 100000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value"
],
"type": "Filter"
}
],
"map": [
{
"inputs": [
"*"
],
"output": "*",
"type": "PassThrough"
},
{
"expression": "fToC($1)",
"inputs": [
"Temperature.Value"
],
"output": "TemperatureF",
"type": "Compute"
},
{
"inputs": [
"@\"Tag 10\".Value"
],
"output": "Humidity",
"type": "Rename"
}
],
"serializationFormat": "Json"
},
"operationType": "BuiltInTransformation"
},
{
"destinationSettings": {
"dataDestination": "factory",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
Ниже приведен пример команды для создания или обновления потока данных с помощью профиля потока данных по умолчанию.
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
Ниже приведен еще один пример использования динамического перевода тем для маршрутизации сообщений из различных термостатов в разделы, относящиеся к устройству:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"destinationSettings": {
"dataDestination": "processed/device/${inputTopic.2}/temperature",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
Эта конфигурация обрабатывает сообщения из thermostats/device1/sensor/temperature и отправляет их в processed/device/device1/temperature.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
// Pointer to the default data flow endpoint
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// Use the default MQTT endpoint as the source
endpointRef: defaultDataflowEndpoint.name
// Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources: [
'azure-iot-operations/data/thermostat'
]
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// Filter the data where temperature * "Tag 10" < 100000
filter: [
{
inputs: [
'temperature.Value'
'"Tag 10".Value'
]
expression: '$1 * $2 < 100000'
}
]
map: [
// Passthrough all values by default
{
inputs: [
'*'
]
output: '*'
}
// Convert temperature to Fahrenheit and output it to TemperatureF
{
inputs: [
'temperature.Value'
]
output: 'TemperatureF'
expression: 'cToF($1)'
}
// Extract the "Tag 10" value and output it to Humidity
{
inputs: [
'"Tag 10".Value'
]
output: 'Humidity'
}
]
}
}
{
operationType: 'Destination'
destinationSettings: {
// Use the default MQTT endpoint as the destination
endpointRef: defaultDataflowEndpoint.name
// Send the data to the MQTT topic factory
dataDestination: 'factory'
}
}
]
}
}
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# Use the default MQTT endpoint as the source
endpointRef: default
# Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources:
- azure-iot-operations/data/thermostat
# Transformation optional
- operationType: builtInTransformation
builtInTransformationSettings:
# Filter the data where temperature * "Tag 10" < 100000
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: '$1 * $2 < 100000'
map:
# Passthrough all values by default
- inputs:
- '*'
output: '*'
# Convert temperature to Fahrenheit and output it to TemperatureF
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
# Extract the "Tag 10" value and output it to Humidity
- inputs:
- '"Tag 10".Value'
output: 'Humidity'
- operationType: Destination
destinationSettings:
# Use the default MQTT endpoint as the destination
endpointRef: default
# Send the data to the MQTT topic factory
dataDestination: factory
Дополнительные примеры конфигураций data flow см. в статье Azure REST API — Data flow и quickstart Bicep.
Убедитесь, что поток данных работает
Чтобы убедиться, что поток данных работает, ознакомьтесь с учебным пособием Мост двунаправленного MQTT для Azure Event Grid.
Экспорт конфигурации потока данных
Чтобы экспортировать конфигурацию потока данных, используйте консоль управления или экспортируйте настраиваемый ресурс потока данных.
Выберите поток данных, который вы хотите экспортировать, а затем выберите Export на панели инструментов.
Используйте команду az iot ops dataflow show для экспорта data flow.
az iot ops dataflow show --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <DataflowName> --profile <DataflowProfileName> --output json > my-dataflow.json
Ниже приведен пример команды для экспорта data flow с именем data-flow в JSON-файл с именем data-flow.json:
az iot ops dataflow show --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --output json > data-flow.json
Bicep — это инфраструктура в виде кода, и экспорт не требуется. Используйте файл Bicep для создания потока данных для быстрой настройки и конфигурирования потоков данных.
Внимание
Использование манифестов развертывания Kubernetes не поддерживается в рабочих средах и должно использоваться только для отладки и тестирования.
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml
Правильная конфигурация потока данных
Чтобы убедиться, что поток данных работает как ожидается, проверьте следующие условия:
- Конечная точка потока данных MQTT по умолчанию должна использоваться как источник или назначение.
- Профиль потока данных существует и упоминается в конфигурации потока данных.
- Источник — это конечная точка MQTT, конечная точка Kafka или ресурс. Конечные точки типа хранилища нельзя использовать как источник.
- При использовании Event Grid в качестве источника, установите количество экземпляров профиля потоков данных равным 1, так как MQTT-брокер Event Grid не поддерживает общие подписки.
- При использовании Центров событий в качестве источника каждый концентратор событий в пространстве имен является отдельным разделом Kafka, и необходимо указать каждый из них в качестве источника данных.
- Если преобразование используется, оно настроено с правильным синтаксисом, включая надлежащее экранирование специальных символов.
- При использовании конечных точек типа storage в качестве назначения указывается schema.
- При использовании динамических топиков для конечных точек MQTT убедитесь, что переменные топика ссылаются на действительные сегменты.
Следующие шаги