Руководство. Маршрутизация данных с помощью политик обновления таблиц

Переключайте службы с помощью раскрывающегося списка Версия. Дополнительные сведения о навигации.
Область применения: ✅ Microsoft Fabric ✅ Azure Data Explorer

Когда исходные данные включают простые и быстрые преобразования, выполните их вверх по конвейеру с помощью потока событий. Однако этот подход может не работать хорошо для других преобразований, которые являются сложными или требуют специализированных функциональных возможностей для работы.

В этом руководстве вы узнаете, как:

В этом руководстве показано, как использовать политики обновления для маршрутизации данных с целью выполнения сложных преобразований для обогащения, очистки и трансформации данных на этапе их поступления. Список других распространенных вариантов использования см. в разделе "Распространенные варианты использования" для политик обновления таблиц.

Предпосылки

  • Учетная запись Майкрософт или удостоверение пользователя Microsoft Entra. Подписка Azure не требуется.

Шаг 1. Создание таблиц и обновление политики

Ниже приведены инструкции по созданию исходной таблицы, функций преобразования, целевых таблиц и политик обновления. В этом руководстве показано, как использовать политики обновления таблиц для выполнения сложных преобразований и сохранения результатов в одной или нескольких целевых таблицах. В примере используется одна исходная таблица с именем Raw_Table и три целевые таблицы с именем Device_Telemetry, Device_Alarms и Error_Log.

  1. Выполните следующую команду, чтобы создать таблицу с именем Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    Исходная таблица — это место, где сохраняются поглощаемые данные. В таблице есть один столбец с именем RawData типа dynamic. Динамический тип сохраняет необработанные данные без какой-либо схемы. Дополнительные сведения см. в команде create table.

  2. Выполните следующую команду, чтобы создать функции с именем Get_Telemetry, Get_Alarms и Log_Error.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    При создании политики обновления можно указать встроенный скрипт для выполнения. Инкапсулируйте логику преобразования в функцию. Использование функции улучшает обслуживание кода. При поступлении новых данных функция выполняется для преобразования данных. Функцию можно повторно использовать в нескольких политиках обновления. Дополнительные сведения см. в команде .create function.

  3. Выполните следующую команду, чтобы создать таблицы назначения.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    Целевая таблица должна иметь ту же схему, что и выходные данные функции преобразования. Целевые таблицы можно создавать следующим образом:

    • .create table Используйте команду и вручную укажите схему, как показано при создании таблицы Device_Telemetry. Однако этот подход может быть подвержен ошибкам и занимает много времени.
    • Используйте команду, .set-or-append если вы уже создали функцию для преобразования данных. Этот метод создает новую таблицу с той же схемой, что и у выходных данных функции, используя take 0, чтобы убедиться, что функция возвращает только схему. Дополнительные сведения см. в команде .set-or-append.
  4. Выполните следующую команду, чтобы создать политики обновления для целевых таблиц.

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    .alter table policy update Используйте команду, чтобы связать исходную таблицу, функцию преобразования и целевую таблицу. Создайте политику обновления в целевой таблице и укажите исходную таблицу и функцию преобразования. Дополнительные сведения см. в команде .alter table policy update.

Шаг 2 - Загрузка образцов данных

Чтобы проверить политики обновления, введите примеры данных в исходную таблицу с помощью .set-or-append команды. Дополнительные сведения см. в разделе приема данных из запроса.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

Шаг 3. Проверка результатов

Чтобы проверить результаты, выполните запрос, чтобы убедиться, что данные были преобразованы и перенаправлены в целевые таблицы. В следующем примере union оператор объединяет источник и результаты из целевых таблиц в один результирующий набор.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Выходные данные

Вы увидите следующие выходные данные, в которых Raw_Table имеет три строки, а конечные таблицы имеют одну строку.

TableName Rows
Raw_Table 3
Журнал ошибок 1
Сигналы_Устройства 1
Телееметрия устройства 1

Шаг 4. Очистка ресурсов

Выполните следующую команду в базе данных, чтобы очистить таблицы и функции, созданные в этом руководстве.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error