Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Внимание
На этой странице содержатся инструкции по управлению компонентами Операций Интернета вещей Azure с помощью манифестов развертывания Kubernetes, которые доступны в предварительной версии. Эта функция предоставляется с несколькими ограничениями и не должна использоваться для рабочих нагрузок.
Юридические условия, применимые к функциям Azure, которые находятся в состоянии бета-версии, предварительной версии или иным образом еще не выпущены в общедоступной версии, см. на странице Дополнительные условия использования предварительных версий в Microsoft Azure.
Чтобы настроить двунаправленное взаимодействие между операциями Интернета вещей Azure и брокерами Apache Kafka, можно настроить конечную точку потока данных. Эта конфигурация позволяет указать конечную точку соединения, TLS, проверку подлинности и другие параметры.
Предварительные условия
- Экземпляр операций Интернета вещей Azure
Центры событий Azure
Центры событий Azure совместим с протоколом Kafka и может использоваться с потоками данных с некоторыми ограничениями.
Создание пространства имен Центры событий Azure и концентратора событий
Сначала создайте пространство имен с поддержкой Kafka Центры событий Azure
Затем создайте концентратор событий в пространстве имен. Каждый отдельный концентратор событий соответствует теме Kafka. Вы можете создать несколько центров событий в одном пространстве имен, чтобы представить несколько разделов Kafka.
Назначьте разрешение управляемому удостоверению
Чтобы настроить конечную точку потока данных для Центра событий Azure, мы рекомендуем использовать управляемое удостоверение, назначенное пользователем или системой. Этот подход является безопасным и устраняет необходимость управления учетными данными вручную.
После создания пространства имен и концентратора событий в Azure Event Hubs необходимо назначить роль управляемому удостоверению Azure IoT Operations, которая предоставляет разрешение на отправку и получение сообщений в концентратор событий.
При использовании управляемого удостоверения, назначаемого системой, в портал Azure перейдите к экземпляру Операций Интернета вещей Azure и выберите "Обзор". Скопируйте имя расширения, указанного после расширения Azure IoT Operations Arc. Например, azure-iot-operations-xxxx7. Управляемое удостоверение, назначаемое системой, можно найти с помощью того же имени расширения Azure Arc для операций IoT.
Затем перейдите к пространству имен Центров событий >управление доступом (IAM)>добавить назначение ролей.
- На вкладке "Роль" выберите соответствующую роль, например
Azure Event Hubs Data Sender
илиAzure Event Hubs Data Receiver
. Это предоставляет управляемой учетной записи необходимые разрешения для отправки или получения сообщений во всех центрах событий в данном пространстве имен. Дополнительные сведения см. в статье Аутентификация приложения с помощью идентификатора Microsoft Entra для доступа к ресурсам Центров событий. - На вкладке "Члены" :
- При использовании управляемого удостоверения, назначаемого системой, для назначения доступа выберите "Пользователь", "Группа" или "Субъект-служба ", а затем выберите +Выберите участников и найдите имя расширения Azure IoT Operations Arc.
- При использовании управляемой идентичности, назначаемой пользователем, для назначения доступа выберите параметр "Управляемая идентичность", а затем выберите "Выбрать участников" и выполните поиск вашей управляемой идентичности, назначаемой пользователем, настроенной для облачных подключений.
Создание конечной точки для потока данных Центра событий Azure
После настройки пространства имен Центры событий Azure и концентратора событий можно создать конечную точку потока данных для пространства имен с поддержкой Kaf Центры событий Azure ka.
В интерфейсе операций перейдите на вкладку конечных точек потока данных.
В разделе "Создание новой конечной точки потока данных" выберите Центры событий Azure> New.
Введите следующие параметры для конечной точки:
Настройка Описание Имя. Имя конечной точки потока данных. Хост Имя узла Центров событий. Вы можете выполнить поиск существующего узла Центров событий или ввести имя узла вручную с помощью формата <NAMESPACE>.servicebus.windows.net
.Порт Порт узла Event Hubs. Для Центров событий используется 9093
порт.Метод аутентификации Метод, используемый для проверки подлинности. Рекомендуем выбрать управляемое удостоверение, назначаемое системой или управляемое удостоверение, назначаемое пользователем. Нажмите "Применить", чтобы настроить конечную точку.
Примечание.
Раздел Kafka или отдельный концентратор событий настраивается позже при создании потока данных. Раздел Kafka предназначен для сообщений потока данных.
Используйте строку подключения для аутентификации в Центрах событий
Внимание
Чтобы использовать веб-интерфейс для управления операциями для управления секретами, необходимо сначала включить Azure IoT Operations, настроив безопасные параметры с помощью Azure Key Vault и активировав удостоверения рабочей нагрузки. Дополнительные сведения см. в статье "Включение безопасных параметров в развертывании Операций Интернета вещей Azure".
На странице параметров конечной точки потока данных для операций выберите вкладку Базовый, затем выберите метод проверки подлинности>SASL.
Введите следующие параметры для конечной точки:
Настройка | Описание |
---|---|
Тип SASL | Выберите Plain . |
Синхронизированное имя секрета | Введите имя секрета Kubernetes, содержащего строку подключения. |
Ссылка на имя пользователя или секрет токена | Ссылка на секрет, используемый в комбинации с именем пользователя или токеном для аутентификации по протоколу SASL. Выберите его из списка Key Vault или создайте новый. Значение должно быть равно $ConnectionString . |
Ссылка на пароль секрета токена | Ссылка на пароль или секрет маркера, используемый для аутентификации SASL. Выберите его из списка Key Vault или создайте новый. Значение должно быть в формате Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> . |
После нажатия кнопки "Добавить ссылку" при нажатии кнопки "Создать" введите следующие параметры:
Настройка | Описание |
---|---|
Имя секрета | Имя секрета в Azure Key Vault. Выберите имя, которое легко помнить, чтобы выбрать секрет позже из списка. |
Значение секрета | Введите $ConnectionString имя пользователя. Для пароля введите строка подключения в форматеEndpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> . |
Задать дату активации | Если этот параметр включен, дата, когда секрет становится активным. |
Задать дату окончания срока действия | Если этот параметр включен, дата истечения срока действия секрета. |
Дополнительные сведения о секретах см. в статье "Создание секретов и управление ими в операциях Интернета вещей Azure".
Ограничения
Центры событий Azure не поддерживает все типы сжатия, поддерживаемые Kafka. В настоящее время в Центре событий Azure на уровнях «Премиум» и «Дедицированных» поддерживается только сжатие GZIP. Использование других типов сжатия может привести к ошибкам.
Пользовательские брокеры Kafka
Чтобы настроить конечную точку потока данных для Kafka-брокеров, не использующих Концентратор событий, установите сервер, TLS, проверку подлинности и другие параметры по мере необходимости.
В интерфейсе операций перейдите на вкладку конечных точек потока данных.
В разделе "Создание новой конечной точки потока данных" выберите "Настраиваемый брокер Kafka">Новый.
Введите следующие параметры для конечной точки:
Настройки Описание Имя. Имя конечной точки потока данных. Хост Имя узла брокера Kafka в формате <Kafka-broker-host>:xxxx
. Включите номер порта в параметр узла.Метод аутентификации Метод, используемый для проверки подлинности. Выберите SASL. Тип SASL Тип проверки подлинности SASL. Выберите "Обычный", "ScramSha256" или "ScramSha512". Обязательно, если используется SASL. Название синхронизированного секрета Название секрета. Обязательно, если используется SASL. Указание имени пользователя для секретного ключа токена Ссылка на имя пользователя в секрете токена SASL. Обязательно, если используется SASL. Нажмите "Применить", чтобы настроить конечную точку.
Примечание.
В настоящее время функционал системы не поддерживает использование конечной точки потока данных Kafka в качестве источника. Поток данных можно создать с исходной конечной точкой потока данных Kafka с помощью Kubernetes или Bicep.
Чтобы настроить параметры конечной точки, используйте следующие разделы для получения дополнительных сведений.
Доступные методы проверки подлинности
Для конечных точек потока данных брокера Kafka доступны следующие методы проверки подлинности.
Управляемое удостоверение, назначаемое системой
Перед настройкой конечной точки потока данных назначьте роль управляемому удостоверению Операций Интернета вещей Azure, который предоставляет разрешение на подключение к брокеру Kafka:
- В портале Azure перейдите к экземпляру Azure IoT и выберите "Обзор".
- Скопируйте имя расширения, указанного после расширения Azure IoT Operations Arc. Например, azure-iot-operations-xxxx7.
- Перейдите к облачному ресурсу, которому необходимо предоставить разрешения. Например, перейдите в пространство имен Центров событий >Управление доступом (IAM)>, затем Добавить назначение роли.
- На вкладке "Роль" выберите соответствующую роль.
- На вкладке "Участники" для назначения доступа, выберите опцию "Пользователь, группа или служебный принципал", а затем нажмите опцию "+ Выбрать участников" и выполните поиск управляемого удостоверения Azure IoT Operations. Например, azure-iot-operations-xxxx7.
Затем настройте конечную точку потока данных с параметрами управляемой идентификации, назначенной системой.
На странице параметров конечной точки потока данных для операций выберите вкладку Базовый, затем выберите Способ аутентификации>Управляемое удостоверение, назначенное системой.
Эта конфигурация создает управляемую идентификацию с аудиторией по умолчанию, совпадающей со значением хоста пространства имен Event Hubs в формате https://<NAMESPACE>.servicebus.windows.net
. Однако если вам нужно переопределить аудиторию по умолчанию, можно задать audience
для поля требуемое значение.
Не поддерживается в операционной среде.
Управляемое удостоверение, назначаемое пользователем
Чтобы использовать назначаемое пользователем управляемое удостоверение для проверки подлинности, необходимо сначала развернуть операции Интернета вещей Azure с включенными безопасными параметрами. Затем необходимо настроить управляемое удостоверение, назначаемое пользователем, для облачных подключений. Дополнительные сведения см. в статье "Включение безопасных параметров в развертывании Операций Интернета вещей Azure".
Перед тем как настроить конечную точку потока данных, назначьте роль назначенной пользователем управляемой идентичности, которая предоставляет разрешение на подключение к брокеру Kafka:
- В портал Azure перейдите к облачному ресурсу, которому необходимо предоставить разрешения. Например, перейдите к элементу >управления доступом (IAM) в пространстве имен Event Grid>Добавить назначение роли.
- На вкладке "Роль" выберите соответствующую роль.
- На вкладке Участники, для назначения доступа выберите параметр Управляемое удостоверение, затем нажмите + Выбрать участников и найдите ваше управляемое удостоверение, назначаемое пользователем.
Затем настройте конечную точку потока данных, используя параметры управляемого удостоверения, назначенного пользователем.
На странице параметров конечной точки потока данных для операций выберите вкладку "Базовый", затем выберите метод проверки подлинности>Назначаемая пользователем управляемая идентичность.
Здесь область применения — это аудитория управляемого удостоверения. Значение по умолчанию совпадает со значением узла пространства имен Центров событий в форме https://<NAMESPACE>.servicebus.windows.net
. Однако если необходимо переопределить аудиторию по умолчанию, можно задать для поля области требуемое значение с помощью Bicep или Kubernetes.
SASL
Чтобы использовать SASL для проверки подлинности, укажите метод проверки подлинности SASL и настройте тип SASL и ссылку на секрет с именем секрета, содержащего маркер SASL.
На странице параметров конечной точки потока данных для операций выберите вкладку Основное, а затем выберите метод проверки подлинности>, SASL.
Введите следующие параметры для конечной точки:
Настройка | Описание |
---|---|
Тип SASL | Тип проверки подлинности SASL, который следует использовать. Поддерживаются типы Plain , ScramSha256 и ScramSha512 . |
Синхронизированное секретное имя | Имя секрета Kubernetes, содержащего маркер SASL. |
Ссылка на имя пользователя или секрет токена | Ссылка на пароль или токен, используемый для аутентификации SASL. |
Ссылка на пароль секрета токена | Ссылка на пароль или секрет токена, используемый для проверки подлинности SASL. |
Поддерживаемые типы SASL:
Plain
ScramSha256
ScramSha512
Секрет должен находиться в том же пространстве имен, что и конечная точка потока данных Kafka. Секрет должен иметь маркер SASL в качестве пары "ключ-значение".
Анонимные
Чтобы использовать анонимную проверку подлинности, обновите раздел проверки подлинности параметров Kafka, чтобы использовать анонимный метод.
На странице параметров конечной точки потока данных для операций выберите вкладку "Базовый" и выберите метод>проверки подлинности None.
Расширенные настройки
Дополнительные параметры можно задать для конечной точки потока данных Kafka, таких как TLS, доверенный сертификат ЦС, параметры обмена сообщениями Kafka, пакетная обработка и CloudEvents. Эти параметры можно задать на вкладке "Расширенный портал потока данных" или в ресурсе конечной точки потока данных.
В интерфейсе операций выберите вкладку "Дополнительно " для конечной точки потока данных.
Параметры протокола TLS
Режим TLS
Чтобы включить или отключить TLS для конечной точки Kafka, обновите mode
параметр в параметрах TLS.
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем установите флажок рядом с включенным режимом TLS.
Для режима TLS можно задать Enabled
или Disabled
. Если задан Enabled
режим, поток данных использует безопасное подключение к брокеру Kafka. Если задан Disabled
режим, поток данных использует небезопасное подключение к брокеру Kafka.
Сертификат доверенного удостоверяющего центра
Настройте доверенный сертификат ЦС для конечной точки Kafka, чтобы установить безопасное подключение к брокеру Kafka. Этот параметр важен, если брокер Kafka использует самоподписанный сертификат или сертификат, подписанный пользовательским ЦС, которому по умолчанию не доверяют.
На странице настроек конечной точки потока данных в операционной среде выберите вкладку "Дополнительно", а затем используйте поле конфигурационной карты с сертификатом доверенного центра сертификации (ЦС), чтобы указать ConfigMap, содержащий сертификат доверенного ЦС.
Этот "ConfigMap" должен содержать сертификат центра сертификации в формате PEM. ConfigMap должен находиться в том же пространстве имен, что и ресурс потока данных Kafka. Например:
kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations
Совет
При подключении к Центру событий Azure сертификат удостоверяющего центра (ЦС) не требуется, так как служба Центра событий использует сертификат, подписанный общедоступным ЦС, доверенного по умолчанию.
Идентификатор группы потребителей
Идентификатор группы потребителей используется для идентификации группы потребителей, используемой потоком данных для чтения сообщений из раздела Kafka. Идентификатор группы потребителей должен быть уникальным в брокере Kafka.
Внимание
Если конечная точка Kafka используется в качестве источника, требуется идентификатор группы потребителей. В противном случае поток данных не может считывать сообщения из раздела Kafka, и вы получите сообщение об ошибке "Конечные точки источника типа Kafka должны иметь определение consumerGroupId".
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле идентификатора группы потребителей, чтобы указать идентификатор группы потребителей.
Этот параметр действует только в том случае, если конечная точка используется в качестве источника (то есть поток данных является потребителем).
Сжатие
Поле сжатия позволяет сжимать сообщения, отправленные в разделы Kafka. Сжатие помогает сократить пропускную способность сети и пространство хранилища, необходимые для передачи данных. Однако сжатие также добавляет некоторые издержки и задержку в процесс. Поддерживаемые типы сжатия перечислены в следующей таблице.
значение | Описание |
---|---|
None |
Сжатие или пакетная обработка не применяется. "Значение по умолчанию — "Нет", если сжатие не указано." |
Gzip |
Применяются сжатие и пакетная обработка GZIP. GZIP — это алгоритм сжатия общего назначения, который обеспечивает хороший баланс между коэффициентом сжатия и скоростью. В настоящее время в уровнях "Премиум" и выделенных уровнях Центров событий Azure поддерживается только сжатие GZIP. |
Snappy |
Применяются сжатие Snappy и пакетирование. Snappy — это алгоритм быстрого сжатия, который предлагает умеренное соотношение сжатия и скорость. Этот режим сжатия не поддерживается Событийным концентратором Azure. |
Lz4 |
Применяются сжатие LZ4 и пакетная обработка. LZ4 — это алгоритм быстрого сжатия, который обеспечивает низкое соотношение сжатия и высокую скорость. Этот режим сжатия не поддерживается Центры событий Azure. |
Чтобы настроить сжатие, выполните приведенные действия.
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле сжатия, чтобы указать тип сжатия .
Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.
Пакетная обработка
Помимо сжатия, можно также настроить пакетную обработку сообщений перед отправкой в разделы Kafka. Пакетная обработка позволяет группировать несколько сообщений вместе и сжимать их в виде одной единицы, что может повысить эффективность сжатия и снизить нагрузку на сеть.
Поле | Описание | Обязательное поле |
---|---|---|
mode |
Может иметь значение Enabled или Disabled . Значением по умолчанию является Enabled то, что Kafka не имеет понятия о незабавленном обмене сообщениями. Если задано значение Disabled , пакетная обработка минимизируется для создания пакета с одним сообщением каждый раз. |
Нет |
latencyMs |
Максимальный интервал времени в миллисекундах, который можно буферизировать перед отправкой сообщений. Если этот интервал достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько или сколько их размеров. Если значение не задано, значение по умолчанию равно 5. | Нет |
maxMessages |
Максимальное количество сообщений, которые можно буферизать перед отправкой. Если это число достигнуто, все буферные сообщения отправляются в виде пакета независимо от того, насколько большой или какой срок их буферизации. Если значение не задано, значение по умолчанию равно 100000. | Нет |
maxBytes |
Максимальный размер в байтах, который можно буферизать перед отправкой. Если этот размер достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько или сколько времени они буферизированы. Значение по умолчанию — 1000000 (1 МБ). | Нет |
Например, если задержка равна 1000, maxMessages равно 100 и maxBytes равно 1,024, сообщения отправляются либо при наличии 100 сообщений в буфере, либо при наличии 1,024 байтов в буфере, либо как только истечет 1,000 миллисекунд с момента последней отправки.
Чтобы настроить пакетную обработку, выполните приведенные действия.
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле "Пакетная обработка ", чтобы включить пакетную обработку. Используйте поля "Задержка пакетной обработки", "Максимальное число байтов" и "Число сообщений", чтобы указать параметры пакетной обработки.
Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.
Стратегия обработки разделов
Стратегия обработки секций определяет, как сообщения назначаются секциям Kafka при отправке их в разделы Kafka. Секции Kafka — это логические сегменты раздела Kafka, обеспечивающие параллельную обработку и отказоустойчивость. Каждое сообщение в теме Kafka имеет раздел и смещение, которые используются для идентификации и упорядочивания сообщений.
Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.
По умолчанию поток данных назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Однако можно использовать различные стратегии для назначения сообщений секциям на основе некоторых критериев, таких как имя раздела MQTT или свойство сообщения MQTT. Это поможет повысить балансировку нагрузки, локализацию данных или упорядочивание сообщений.
значение | Описание |
---|---|
Default |
Назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Это значение по умолчанию, если стратегия не указана. |
Static |
Назначает сообщения фиксированному номеру раздела, который выводится из идентификатора экземпляра потока данных. Это означает, что каждый экземпляр потока данных отправляет сообщения в другую секцию. Это может помочь добиться лучшей балансировки нагрузки и локальности данных. |
Topic |
Использует имя раздела MQTT из источника потока данных в качестве ключа для секционирования. Это означает, что сообщения с тем же именем раздела MQTT отправляются в ту же секцию. Это может помочь добиться лучшего порядка сообщений и локальности данных. |
Property |
Использует свойство сообщения MQTT из источника потока данных в качестве ключа для секционирования. Укажите имя свойства в partitionKeyProperty поле. Это означает, что сообщения с тем же значением свойства отправляются в одну секцию. Это поможет улучшить порядок сообщений и локализацию данных на основе пользовательского критерия. |
Например, если вы задали стратегию Property
обработки секций и свойству device-id
ключа секции, сообщения с тем же свойством отправляются в ту же device-id
секцию.
Чтобы настроить стратегию обработки секций, выполните следующие действия.
На странице параметров конечной точки потока данных для операций выберите вкладку Дополнительно, а затем используйте поле стратегии обработки разделов, чтобы указать параметры стратегии обработки разделов. Используйте поле свойства ключа секции, чтобы указать свойство, используемое для секционирования, если стратегия заданаProperty
.
Благодарности Kafka
Подтверждения Kafka (acks) используются для управления устойчивостью и согласованностью сообщений, отправленных в разделы Kafka. Когда производитель отправляет сообщение в раздел Kafka, он может запрашивать различные уровни подтверждения от брокера Kafka, чтобы убедиться, что сообщение успешно записано в раздел и реплицируется в кластере Kafka.
Этот параметр действует только в том случае, если конечная точка используется в качестве назначения (т. е. поток данных является производителем).
значение | Описание |
---|---|
None |
Поток данных не ожидает каких-либо подтверждений от брокера Kafka. Этот параметр является самым быстрым, но наименее устойчивым вариантом. |
All |
Поток данных ожидает, пока сообщение будет записано в раздел лидера и во все разделы последователей. Этот параметр является самым медленным, но наиболее устойчивым вариантом. Этот параметр также является параметром по умолчанию |
One |
Поток данных ожидает записи сообщения в раздел лидера и по крайней мере одну секцию подписчика. |
Zero |
Поток данных ожидает записи сообщения в раздел лидера, но не ожидает каких-либо подтверждений от подписчиков. Это быстрее, чем One , но менее прочное. |
Например, если задать подтверждение All
Kafka, поток данных ожидает записи сообщения в раздел лидера и все секции подписчиков перед отправкой следующего сообщения.
Чтобы настроить подтверждения Kafka, выполните следующие действия.
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле подтверждения Kafka, чтобы указать уровень подтверждения Kafka.
Этот параметр действует только в том случае, если конечная точка используется в качестве назначения, где поток данных является производителем.
Копирование свойств MQTT
По умолчанию настройка копирования свойств MQTT включена. Эти свойства пользователя включают такие значения, как subject
, которое хранит имя ресурса, отправляющего сообщение.
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем установите флажок рядом с полем "Копировать свойства MQTT", чтобы включить или отключить копирование свойств MQTT.
В следующих разделах описывается преобразование свойств MQTT в заголовки пользователей Kafka и наоборот, когда параметр включен.
Конечная точка Kafka — это точка назначения
Когда конечная точка Kafka является назначением потока данных, все определенные свойства спецификации MQTT версии 5 преобразуются в заголовки пользователей Kafka. Например, сообщение MQTT версии 5 с "Content Type", перенаправляемым в Kafka, преобразуется в пользовательский заголовок Kafka. Аналогичные правила применяются к другим встроенным свойствам MQTT, определенным в следующей таблице.
Свойство MQTT | Переведенное поведение |
---|---|
Индикатор формата полезных данных | Ключ: "Индикатор формата полезных данных" Значение: "0" (полезные данные в байтах) или "1" (полезные данные в формате UTF-8) |
Тема ответа | Ключ: "Тема ответа" Значение: копия раздела ответа из исходного сообщения. |
Интервал истечения срока действия сообщения | Ключ: "Интервал истечения срока действия сообщения" Значение: представление UTF-8 числа секунд до истечения срока действия сообщения. См. свойство "Интервал истечения срока действия сообщения" для получения дополнительных сведений. |
Данные корреляции: | Ключ: "Данные корреляции" Значение: копирование данных корреляции из исходного сообщения. В отличие от многих свойств MQTT версии 5, которые кодируются в кодировке UTF-8, данные корреляции могут быть произвольными данными. |
Тип содержимого | Ключ: "Тип контента" Значение: копия типа контента из исходного сообщения. |
Пары значений ключей ключа свойства пользователя MQTT версии 5 напрямую преобразуются в заголовки пользователей Kafka. Если заголовок пользователя в сообщении имеет то же имя, что и встроенное свойство MQTT (например, заголовок с именем "Корреляционные данные"), то выбор между перенаправлением значения свойства спецификации MQTT версии 5 или пользовательского свойства не определен.
Потоки данных никогда не получают эти свойства от брокера MQTT. Таким образом, поток данных никогда не пересылает их:
- Псевдоним темы
- Идентификаторы подписки
Свойство "Интервал истечения срока действия сообщения"
Интервал истечения срока действия сообщения указывает, сколько времени сообщение может оставаться в брокере MQTT до его удаления.
Когда поток данных получает сообщение MQTT с указанным интервалом истечения срока действия сообщения, он:
- Записывает время получения сообщения.
- Перед отправкой сообщения в пункт назначения из начального времени истечения срока действия вычитается время, в течение которого сообщение находилось в очереди.
- Если сообщение не истекло (операция выше равно > 0), сообщение отправляется в место назначения и содержит обновленное время истечения срока действия сообщения.
- Если сообщение истекло (если результат операции выше это < = 0), сообщение не излучается целевым объектом.
Примеры:
- Поток данных получает сообщение MQTT с интервалом истечения срока действия сообщения = 3600 секунд. Соответствующий пункт назначения временно отключен, но может повторно подключиться. 1 000 секунд проходит, прежде чем это сообщение MQTT отправляется на целевой узел. В этом случае для сообщения назначения установлен интервал истечения срока действия 2600 (3600 - 1000) секунд.
- Поток данных получает сообщение MQTT с интервалом истечения срока действия сообщения = 3600 секунд. Соответствующий адрес временно отключён, но может подключиться снова. Однако в этом случае для повторного подключения требуется 4000 секунд. Срок действия сообщения истек, и поток данных не перенаправит это сообщение в место назначения.
Конечная точка Kafka — это источник потока данных
Примечание.
Существует известная проблема при использовании конечной точки Центров событий в качестве источника для потока данных, когда заголовок Kafka повреждается при его преобразовании в MQTT. Это происходит только в том случае, если используются центры событий через клиент Центров событий, который использует AMQP под крышкой. Например, для случая "foo"="bar", переводится "foo", но значение становится "\xa1\x03bar".
Когда конечная точка Kafka является источником потока данных, заголовки пользователей Kafka преобразуются в свойства MQTT версии 5. В следующей таблице описывается преобразование заголовков пользователей Kafka в свойства MQTT версии 5.
Заголовок Kafka | Переведенное поведение |
---|---|
Ключ | Ключ: "Ключ" Значение: копия ключа из исходного сообщения. |
Метка времени | Ключ: "Метка времени" Значение: кодировка UTF-8 метки времени Kafka, представляющая собой число миллисекунд с эпохи Unix. |
Пары ключей и значений пользовательских заголовков Kafka, при условии, что они все закодированы в UTF-8, напрямую преобразуются в пользовательские свойства ключей и значений MQTT.
UTF-8 / двоичные несоответствия
MQTT версии 5 поддерживает только свойства на основе UTF-8. Если поток данных получает сообщение Kafka, содержащее один или несколько заголовков, отличных от UTF-8, поток данных:
- Удалите проблемное свойство или свойства.
- Перенаправите остальное сообщение в соответствии с предыдущими правилами.
Приложения, для которых требуется двоичная передача в заголовках источника Kafka => свойства MQTT Target должны сначала кодировать их в кодировке UTF-8, например с помощью Base64.
>=64 КБ несоответствия свойств
Свойства MQTT версии 5 должны быть меньше 64 КБ. Если поток данных получает сообщение Kafka, содержащее один или несколько заголовков размером >= 64 КБ, поток данных будет:
- Удалите проблемное свойство или свойства.
- Перенаправите остальное сообщение в соответствии с предыдущими правилами.
Преобразование свойств при использовании Центров событий и поставщиков, использующих AMQP
Если ваш клиент пересылает сообщения на конечную точку источника потока данных Kafka, выполняющую одно из следующих действий:
- Отправка сообщений в Центры событий с помощью клиентских библиотек, таких как Azure.Messaging.EventHubs
- Использование AMQP напрямую
Существуют нюансы перевода свойств, которые следует учитывать.
Необходимо выполнить одно из следующих действий:
- Избегайте отправки свойств
- Если необходимо отправить свойства, отправьте значения, закодированные как UTF-8.
Когда Центр событий преобразовывает свойства из AMQP в Kafka, он включает базовые типы, закодированные AMQP в своем сообщении. Дополнительные сведения о поведении см. в разделе "Обмен событиями между потребителями и производителями" с помощью различных протоколов.
В следующем примере кода, когда конечная точка потока данных получает значение "foo":"bar"
, оно получает свойство как <0xA1 0x03 "bar">
.
using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;
var propertyEventBody = new BinaryData("payload");
var propertyEventData = new EventData(propertyEventBody)
{
Properties =
{
{"foo", "bar"},
}
};
var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);
Конечная точка потока данных не может перенаправить свойство <0xA1 0x03 "bar">
нагрузки в сообщение MQTT, так как данные не являются в формате UTF-8. Однако если указать строку UTF-8, конечная точка потока данных преобразует строку перед отправкой в MQTT. Если используется строка UTF-8, сообщение MQTT будет иметь "foo":"bar"
в качестве свойств пользователя.
Переводятся только заголовки UTF-8. Например, учитывая следующий сценарий, в котором свойство задано как float:
Properties =
{
{"float-value", 11.9 },
}
Конечная точка потока данных удаляет пакеты, содержащие "float-value"
поле.
Не все свойства данных событий, например, propertyEventData.correlationId, перенаправляются. Дополнительные сведения см. в разделе "Свойства пользователя события",
CloudEvents
CloudEvents — это способ описания данных событий распространенным способом. Параметры CloudEvents используются для отправки или получения сообщений в формате CloudEvents. Вы можете использовать CloudEvents для архитектур на основе событий, где разные службы должны взаимодействовать друг с другом в одном или разных поставщиках облачных служб.
Параметры: CloudEventAttributes
— это Propagate
или CreateOrRemap
.
На странице параметров конечной точки потока данных для операций выберите вкладку "Дополнительно ", а затем используйте поле атрибутов событий CloudEvents, чтобы указать параметр CloudEvents.
В следующих разделах описывается, как свойства CloudEvent распространяются, создаются или изменяются и переназначаются.
Параметр распространения
Свойства CloudEvent передаются для сообщений, содержащих необходимые свойства. Если сообщение не содержит необходимых свойств, сообщение передается как есть. Если необходимые свойства присутствуют, ce_
префикс добавляется в имя свойства CloudEvent.
Имя. | Обязательное поле | Пример значения | Название вывода | Выходное значение |
---|---|---|---|---|
specversion |
Да | 1.0 |
ce-specversion |
Передается как есть |
type |
Да | ms.aio.telemetry |
ce-type |
Передается как есть |
source |
Да | aio://mycluster/myoven |
ce-source |
Передается в неизменном виде |
id |
Да | A234-1234-1234 |
ce-id |
Передается как есть |
subject |
Нет | aio/myoven/telemetry/temperature |
ce-subject |
Передается как есть |
time |
Нет | 2018-04-05T17:31:00Z |
ce-time |
Передается как есть. Это не перепечатано. |
datacontenttype |
Нет | application/json |
ce-datacontenttype |
Изменен на тип содержимого выходных данных после необязательного этапа преобразования. |
dataschema |
Нет | sr://fabrikam-schemas/123123123234234234234234#1.0.0 |
ce-dataschema |
Если в конфигурации преобразования выходных данных задана схема преобразования выходных данных, dataschema изменится на выходную схему. |
Параметр CreateOrRemap
Свойства CloudEvent передаются для сообщений, содержащих необходимые свойства. Если сообщение не содержит обязательных свойств, создаются свойства.
Имя. | Обязательное поле | Название вывода | Сгенерированное значение, если его нет |
---|---|---|---|
specversion |
Да | ce-specversion |
1.0 |
type |
Да | ce-type |
ms.aio-dataflow.telemetry |
source |
Да | ce-source |
aio://<target-name> |
id |
Да | ce-id |
Созданный идентификатор UUID в целевом клиенте |
subject |
Нет | ce-subject |
Выходной раздел, в котором отправляется сообщение |
time |
Нет | ce-time |
Создано как RFC 3339 в целевом клиенте |
datacontenttype |
Нет | ce-datacontenttype |
Изменён тип содержимого выходных данных после необязательного этапа преобразования. |
dataschema |
Нет | ce-dataschema |
Схема, определенная в реестре схем |
Следующие шаги
Дополнительные сведения о потоках данных см. в статье "Создание потока данных".