Eventstream REST API

Интерфейсы REST API Microsoft Fabric позволяют автоматизировать процессы и процедуры, помогая вашей организации выполнять задачи более эффективно и точно. Автоматив эти рабочие процессы, вы можете сократить количество ошибок, повысить производительность и сократить затраты на операции.

В Fabric элемент представляет набор возможностей в рамках определенного интерфейса. Например, Eventstream — это элемент в среде интеллектуального анализа в реальном времени. Каждый элемент в Fabric определяется определением элемента — объектом, который описывает структуру, формат и ключевые компоненты, составляющие элемент.

В этой статье содержится комплексное руководство по использованию REST API Microsoft Fabric для создания элементов Eventstream и управления ими в рабочей области Fabric. Подробные спецификации для каждой операции API Eventstream, а также инструкции по созданию и настройке вызовов API.

Полный обзор REST API Microsoft Fabric см. в статье "Использование REST API Microsoft Fabric"

Поддерживаемые API потока событий

В настоящее время Eventstream поддерживает следующие API на основе определения:

Программные интерфейсы Описание
Создание элемента Eventstream с определением Используется для создания элемента Eventstream в рабочей области с подробными сведениями о его топологии, включая источник, назначения, операторы и потоки.
Получить определение элемента Eventstream Используется для получения определения элемента Eventstream с подробными сведениями о его топологии, включая источник, назначения, операторы и потоки.
Обновление определения элемента Eventstream Используется для обновления или изменения определения элемента Eventstream, включая источник, назначения, операторы и потоки.

Чтобы управлять элементами Eventstream с помощью операций CRUD, посетите ИНТЕРФЕЙСЫ REST API Fabric — Eventstream. Эти API поддерживают следующие операции:

  • Создание потока событий
  • Удаление потока событий
  • Получить поток событий
  • Перечисление потоков событий
  • Обновление потока событий

Как вызвать API eventstream?

Шаг 1. Проверка подлинности в Fabric

Чтобы работать с API Fabric, сначала необходимо получить маркер Microsoft Entra для службы Fabric, а затем использовать этот маркер в заголовке авторизации вызова API. Существует два варианта получения токена Microsoft Entra.

Вариант 1: Получение токена с помощью MSAL.NET

Если вашему приложению требуется доступ к API Fabric с использованием сервисного принципала, можно использовать библиотеку MSAL.NET для получения токена доступа. Следуйте краткому руководству по API Fabric, чтобы создать консольное приложение C#, которое получает токен Azure AD с помощью библиотеки MSAL.Net, а затем с помощью C# HttpClient вызвать API для получения списка рабочих областей.

Вариант 2: Получение токена с помощью портала Fabric

Маркер Azure AD можно использовать для проверки подлинности и проверки API Fabric. Войдите на портал Fabric для клиента, где вы хотите проводить тестирование, и нажмите клавишу F12, чтобы открыть режим разработчика браузера. В консоли выполните следующие действия:

powerBIAccessToken

Скопируйте маркер и вставьте его в приложение.

Замечание

Если создаваемый поток событий включает в себя любые источники, использующие облачное подключение, убедитесь, что удостоверение, используемое для получения маркера, имеет разрешение на доступ к этому облачному подключению, будь то субъект-служба или пользователь.

Шаг 2. Подготовка к телу потока событий в формате JSON

Создайте нагрузку JSON, которая будет преобразована в base64 в запросе API. Определение элемента Eventstream следует структуре графа и состоит из следующих компонентов:

Поле Описание
Источники Источники данных, которые могут быть переданы в Eventstream для обработки. Поддерживаемые источники данных включают источники потоковой передачи Azure, сторонние потоковые источники, CDC базы данных (отслеживание изменений), события в хранилище BLOB-объектов Azure и системные события Fabric.
Назначения Конечные точки в Fabric, куда можно направлять обработанные данные, включая Lakehouse, Eventhouse, Activator и другие.
Операторы Обработчики событий, обрабатывающие потоки данных в режиме реального времени, такие как фильтрация, агрегирование, группировка по и присоединение.
Потоки Потоки данных, доступные для подписки и анализа в Центре реального времени. Существует два типа потоков: потоки по умолчанию и производные потоки.

Используйте шаблоны API в GitHub, чтобы помочь определить тело Eventstream.

В этом документе Swagger можно ознакомиться с подробными сведениями о каждом свойстве API, а также с инструкциями по определению нагрузки API Eventstream.

Режим прямого приема в Eventhouse

При использовании режима прямого приема событий как назначения в запросах API Eventstream следуйте приведенным ниже рекомендациям.

1. Указание обязательных свойств

Убедитесь, что следующие свойства правильно заданы в тексте JSON:

  • connectionName — имя соединения Eventhouse.
  • mappingRuleName — правило сопоставления приема для целевой таблицы.

Чтобы найти правильный код connectionName:

  1. Перейдите в базу данных Eventhouse KQL в Fabric.
  2. Выберите потоки данных.
  3. Скопируйте нужный connectionName объект.

Для mappingRuleName вы можете найти подробные инструкции по созданию сопоставлений для приема данных в разделе Mapping with ingestionMappingReference.

2. Настройка разрешений уполномоченного службы

При проверке подлинности с помощью субъекта-службы он должен иметь следующее:

  • Разрешения средства просмотра баз данных.
  • Разрешения для ингестора таблиц.

Эти разрешения можно предоставить двумя способами:

Предоставьте эти разрешения с помощью следующих команд KQL в пользовательском интерфейсе:

.add database ['yourDatabase'] viewers (@'aadapp=<clientid>;<tenantid>')
.add table yourTable ingestors (@'aadapp=<id>;<directoryid>')

Замените clientid и tenantid на значения вашего сервисного принципала.

Эти команды предоставят сервисному принципалу необходимые разрешения уровня данных, позволяя Eventhouse создавать соединение и извлекать данные из Eventstream. Дополнительные сведения см. в разделе "Общие сведения о ролях безопасности"

Снимок экрана: предоставление разрешения базы данных и таблицы с помощью пользовательского интерфейса Kusto.

Дополнительные сведения об определении элемента Eventstream см. в разделе определение элемента Eventstream.

пример определения потока событий в ФОРМАТЕ JSON:

{
  "sources": [
    {
      "name": "SqlServerOnVmDbCdc",
      "type": "SQLServerOnVMDBCDC",
      "properties":
      {
        "dataConnectionId": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
        "tableName": ""
      }
    }
  ],
  "destinations": [
    {
      "name": "Lakehouse",
      "type": "Lakehouse",
      "properties":
      {
        "workspaceId": "bbbb1111-cc22-3333-44dd-555555eeeeee",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "schema": "",
        "deltaTable": "newTable",
        "minimumRows": 100000,
        "maximumDurationInSeconds": 120,
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "derivedStream"}]
    }
  ],
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "SqlServerOnVmDbCdc"}]
    },
    {
      "name": "derivedStream",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "GroupBy"}]
    }
  ],
  "operators": [
    {
      "name": "GroupBy",
      "type": "GroupBy",
      "inputNodes": [{"name": "myEventstream-stream"}],
      "properties":
      {
        "aggregations": [
          {
            "aggregateFunction": "Average",
            "column":
            {
              "expressionType": "ColumnReference",
              "node": null,
              "columnName": "payload",
              "columnPathSegments": [{"field": "ts_ms"}]
            },
            "alias": "AVG_ts_ms"
          }
        ],
        "groupBy": [],
        "window":
        {
          "type": "Tumbling",
          "properties":
          {
            "duration":
            {
              "value": 5,
              "unit": "Minute"
            },
            "offset":
            {
              "value": 1,
              "unit": "Minute"
            }
          }
        }
      }
    }
  ],
  "compatibilityLevel": "1.1"
}

Шаг 3: Создание строки base64 из Eventstream JSON

Используйте средство, например Кодировка Base64 и декодирование для преобразования JSON потока событий в строку base64.

Снимок экрана: кодирование JSON eventstream в строку base64.

Шаг 4. Создание текста запроса API

Используйте JSON в кодировке Base64 на предыдущем шаге в качестве содержимого тела запроса API.

Вот пример полезной нагрузки со строкой в кодировке Base64:

{
 "definition": {
  "parts": [
   {
    "path": "eventstream.json",
    "payload": "ewogICJzb3VyY2VzIjogWwogICAgewogICAgICAibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMiLAogICAgICAidHlwZSI6ICJTUUxTZXJ2ZXJPblZNREJDREMiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiZGF0YUNvbm5lY3Rpb25JZCI6ICJhYWFhYWFhYS0wMDAwLTExMTEtMjIyMi1iYmJiYmJiYmJiYmIiLAogICAgICAgICJ0YWJsZU5hbWUiOiAiIgogICAgICB9CiAgICB9CiAgXSwKICAiZGVzdGluYXRpb25zIjogWwogICAgewogICAgICAibmFtZSI6ICJMYWtlaG91c2UiLAogICAgICAidHlwZSI6ICJMYWtlaG91c2UiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAid29ya3NwYWNlSWQiOiAiYmJiYjExMTEtY2MyMi0zMzMzLTQ0ZGQtNTU1NTU1ZWVlZWVlIiwKICAgICAgICAiaXRlbUlkIjogImNjY2MyMjIyLWRkMzMtNDQ0NC01NWVlLTY2NjY2NmZmZmZmZiIsCiAgICAgICAgInNjaGVtYSI6ICIiLAogICAgICAgICJkZWx0YVRhYmxlIjogIm5ld1RhYmxlIiwKICAgICAgICAibWluaW11bVJvd3MiOiAxMDAwMDAsCiAgICAgICAgIm1heGltdW1EdXJhdGlvbkluU2Vjb25kcyI6IDEyMCwKICAgICAgICAiaW5wdXRTZXJpYWxpemF0aW9uIjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJKc29uIiwKICAgICAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAgICAgIHsKICAgICAgICAgICAgImVuY29kaW5nIjogIlVURjgiCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICB9LAogICAgICAiaW5wdXROb2RlcyI6IFt7Im5hbWUiOiAiZGVyaXZlZFN0cmVhbSJ9XQogICAgfQogIF0sCiAgInN0cmVhbXMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIiwKICAgICAgInR5cGUiOiAiRGVmYXVsdFN0cmVhbSIsCiAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAge30sCiAgICAgICJpbnB1dE5vZGVzIjogW3sibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMifV0KICAgIH0sCiAgICB7CiAgICAgICJuYW1lIjogImRlcml2ZWRTdHJlYW0iLAogICAgICAidHlwZSI6ICJEZXJpdmVkU3RyZWFtIiwKICAgICAgInByb3BlcnRpZXMiOgogICAgICB7CiAgICAgICAgImlucHV0U2VyaWFsaXphdGlvbiI6CiAgICAgICAgewogICAgICAgICAgInR5cGUiOiAiSnNvbiIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJlbmNvZGluZyI6ICJVVEY4IgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIkdyb3VwQnkifV0KICAgIH0KICBdLAogICJvcGVyYXRvcnMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIkdyb3VwQnkiLAogICAgICAidHlwZSI6ICJHcm91cEJ5IiwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIn1dLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiYWdncmVnYXRpb25zIjogWwogICAgICAgICAgewogICAgICAgICAgICAiYWdncmVnYXRlRnVuY3Rpb24iOiAiQXZlcmFnZSIsCiAgICAgICAgICAgICJjb2x1bW4iOgogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImV4cHJlc3Npb25UeXBlIjogIkNvbHVtblJlZmVyZW5jZSIsCiAgICAgICAgICAgICAgIm5vZGUiOiBudWxsLAogICAgICAgICAgICAgICJjb2x1bW5OYW1lIjogInBheWxvYWQiLAogICAgICAgICAgICAgICJjb2x1bW5QYXRoU2VnbWVudHMiOiBbeyJmaWVsZCI6ICJ0c19tcyJ9XQogICAgICAgICAgICB9LAogICAgICAgICAgICAiYWxpYXMiOiAiQVZHX3RzX21zIgogICAgICAgICAgfQogICAgICAgIF0sCiAgICAgICAgImdyb3VwQnkiOiBbXSwKICAgICAgICAid2luZG93IjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJUdW1ibGluZyIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJkdXJhdGlvbiI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiA1LAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgIm9mZnNldCI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiAxLAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIF0sCiAgImNvbXBhdGliaWxpdHlMZXZlbCI6ICIxLjEiCn0=",
    "payloadType": "InlineBase64"
   },
   {
    "path": ".platform",
    "payload": "ewogICIkc2NoZW1hIjogImh0dHBzOi8vZGV2ZWxvcGVyLm1pY3Jvc29mdC5jb20vanNvbi1zY2hlbWFzL2ZhYnJpYy9naXRJbnRlZ3JhdGlvbi9wbGF0Zm9ybVByb3BlcnRpZXMvMi4wLjAvc2NoZW1hLmpzb24iLAogICJtZXRhZGF0YSI6IHsKICAgICJ0eXBlIjogIkV2ZW50c3RyZWFtIiwKICAgICJkaXNwbGF5TmFtZSI6ICJhbGV4LWVzMSIKICB9LAogICJjb25maWciOiB7CiAgICAidmVyc2lvbiI6ICIyLjAiLAogICAgImxvZ2ljYWxJZCI6ICIwMDAwMDAwMC0wMDAwLTAwMDAtMDAwMC0wMDAwMDAwMDAwMDAiCiAgfQp9",
    "payloadType": "InlineBase64"
   }
  ]
 }
}

Шаг 5. Создание элемента eventstream с помощью API

В вашем приложении отправьте запрос на создание элемента Eventstream со строкой в кодировке Base64 в теле сообщения.

Пример PowerShell :

$evenstreamAPI = "https://api.fabric.microsoft.com/v1/workspaces/$workspaceId/items" 

## Invoke the API to create the Eventstream
Invoke-RestMethod -Headers $headerParams -Method POST -Uri $evenstreamAPI -Body ($body) -ContentType "application/json"

Определение элемента потока событий

Определение элемента Eventstream имеет графовую структуру, состоящую из четырех компонентов: источников, назначений, операторов и потоков.

Источники

Чтобы определить источник Eventstream в тексте API, убедитесь, что каждое поле и свойство заданы правильно в соответствии с таблицей.

Поле Тип Описание Требование Допустимые значения и формат
id Строка (UUID) Уникальный идентификатор источника, созданного системой. Необязательный параметр в CREATE, обязательный в UPDATE Формат UUID
name Строка Уникальное имя источника, используемое для идентификации в eventstream. Обязательное поле Любая допустимая строка
type Строка (перечисление) Указывает тип источника. Должен соответствовать одному из предопределенных значений. Обязательное поле AmazonKinesis, AmazonMSKKafka, ApacheKafkaAzureCosmosDBCDCAzureBlobStorageEventsAzureEventHubAzureIoTHubAzureSQLDBCDCAzureSQLMIDBCDCConfluentCloudCustomEndpointFabricCapacityUtilizationEventsGooglePubSubMySQLCDCPostgreSQLCDCSampleDataFabricWorkspaceItemEventsFabricJobEventsFabricOneLakeEvents
properties объект Другие параметры, относящиеся к выбранному типу источника. Обязательное поле Пример типа AzureEventHub : dataConnectionId,consumerGroupName,inputSerialization

Пример источника Eventstream в тексте API:

{
  "sources": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "AzureEventHubSource",
      "type": "AzureEventHub",
      "properties":
      {
        "dataConnectionId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "consumerGroupName": "$Default",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      }
    }
  ]
}

Назначения

Чтобы определить назначение Eventstream в тексте API, убедитесь, что каждое поле и свойство указаны правильно в соответствии с таблицей.

Поле Тип Описание Требование Допустимые значения и формат
id Строка (UUID) Уникальный идентификатор назначения, созданный системой. Необязательный параметр в CREATE, обязательный в UPDATE Формат UUID
name Строка Уникальное имя назначения, используемое для идентификации его в eventstream. Обязательное поле Любая допустимая строка
type Строка (перечисление) Указывает тип назначения. Должен соответствовать одному из предопределенных значений. Обязательное поле "Activator" "CustomEndpoint" "Eventhouse" "Lakehouse"
properties объект Другие параметры, относящиеся к выбранному типу назначения. Обязательное поле Пример типаEventhouse: "dataIngestionMode", "workspaceId", "itemId""databaseName"
inputNodes Массив Ссылка на входные узлы для назначения, например имя потока событий или имя оператора. Обязательное поле Пример: eventstream-1

Опять же, если вы используете пункт назначения режима прямого приема Eventhouse, убедитесь, что свойства connectionName и mappingRuleName указаны правильно. Чтобы найти правильное connectionName, перейдите к базе данных KQL Eventhouse, выберите потоки данных и скопируйте нужные connectionNameданные. Подробные инструкции по созданию сопоставлений приема см. в разделе "Сопоставление с использованием ingestionMappingReference".


Пример источника Eventstream в тексте API:

{
  "destinations": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "EventhouseDestination",
      "type": "Eventhouse",
      "properties":
      {
        "dataIngestionMode": "ProcessedIngestion",
        "workspaceId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "databaseName": "myeventhouse",
        "tableName": "mytable",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "eventstream-1"}]
    }
  ]
}

Операторы

Чтобы определить оператор Eventstream в тексте API, убедитесь, что каждое поле и свойство указаны правильно в соответствии с таблицей.

Поле Тип Описание Требование Допустимые значения и формат
name Строка Уникальное имя оператора. Обязательное поле Любая допустимая строка
type Строка (перечисление) Задает тип оператора. Должен соответствовать одному из предопределенных значений. Обязательное поле "Filter", "Join", "ManageFields""Aggregate""GroupBy""Union""Expand"
properties объект Другие параметры, относящиеся к выбранному типу оператора. Обязательное поле Пример типа Filter : "conditions"
inputNodes Массив Список ссылок на входные узлы оператора. Обязательное поле Пример: eventstream-1
inputSchemas Массив Список ссылок на входные узлы оператора. Необязательно Пример типа Filter : "schema"

Пример оператора Eventstream в тексте API:

{
  "operators": [
    {
      "name": "FilterName",
      "type": "Filter",
      "inputNodes": [{"name": "eventstream-1"}],
      "properties":
      {
        "conditions": [
          {
            "column":
            {
              "node": "nodeName",
              "columnName": "columnName",
              "columnPath": ["path","to","column"]
            },
            "operator": "Equals",
            "value":
            {
              "dataType": "nvarchar(max)",
              "value": "stringValue"
            }
          }
        ]
      }
    }
  ]
}

Потоки

Чтобы определить поток в тексте API, убедитесь, что каждое поле и свойство заданы правильно в соответствии с таблицей.

Поле Тип Описание Требование Допустимые значения и формат
id Строка (UUID) Уникальный идентификатор потока, созданный системой. Необязательно Формат UUID
name Строка Уникальное имя потока. Обязательное поле Любая допустимая строка
type Строка (перечисление) Указывает тип потока. Должен соответствовать одному из предопределенных значений. Обязательное поле "DefaultStream", "DerivedStream"
properties объект Другие параметры, относящиеся к выбранному типу потока. Обязательное поле Пример типа Filter : "conditions"
inputNodes Массив Список ссылок на входные узлы потока. Необязательно Пример: [], "eventstream-1".

Пример потока в тексте API:

{
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "sourceName"}]
    },
    {
      "name": "DerivedStreamName",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "FilterName"}]
    }
  ]
}