Поделиться через


Создание подключения к данным Центров событий для Azure Synapse Data Explorer с помощью C# (предварительная версия)

Это важно

Обозреватель данных Azure Synapse Analytics (предварительная версия) будет прекращен 7 октября 2025 г. После этой даты рабочие нагрузки, работающие в Synapse Data Explorer, будут удалены, а связанные данные приложения будут потеряны. Мы настоятельно рекомендуем мигрировать в Eventhouse на платформе Microsoft Fabric.

Программа Microsoft Cloud Migration Factory (CMF) предназначена для поддержки клиентов при миграции в Fabric. Программа предлагает практические ресурсы клавиатуры без затрат для клиента. Эти ресурсы назначаются в течение 6–8 недель с предопределенной и согласованной областью. Номинации клиентов принимаются от команды учетных записей Microsoft или непосредственно путем отправки запроса на помощь команде CMF.

Azure Synapse Data Explorer — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Azure Synapse Data Explorer позволяет принимать (загружать) данные из Центров событий, Центра Интернета вещей и больших двоичных объектов, записанных в контейнеры больших двоичных объектов.

В этой статье описано, как создать подключение к данным Центров событий для Azure Synapse Data Explorer с помощью C#.

Необходимые компоненты

  • Подписка Azure. Создайте бесплатную учетную запись Azure.

  • Создать пул Data Explorer с помощью Synapse Studio или портала Azure

  • Создать базу данных Data Explorer.

    1. В Synapse Studio в области слева выберите элемент Данные.

    2. Выберите + (Добавить новый ресурс) >Пул Data Explorer, а затем используйте следующие сведения:

      Параметр Предлагаемое значение Описание
      Имя пула contosodataexplorer Имя пула Data Explorer, который вы будете использовать.
      Имя. TestDatabase Имя базы данных должно быть уникальным в пределах кластера.
      Срок хранения по умолчанию 365. Интервал времени (в днях), в течение которого данные будут гарантированно доступны для запросов. Интервал времени измеряется с момента приема данных.
      Период кэширования по умолчанию 31 Интервал времени (в днях), в течение которого необходимо хранить часто запрашиваемые данные в хранилище SSD или ОЗУ, а не в долговременном хранилище.
    3. Чтобы создать базу данных, выберите Создать. Создание обычно занимает меньше минуты.

Примечание.

Прием данных из концентратора событий в пулы Data Explorer не будет работать, если ваша рабочая область Synapse использует управляемую виртуальную сеть с включенной защитой от утечки данных.

  • Visual Studio 2019, загрузите и используйте бесплатную версию Visual Studio 2019 Community Edition. Включите разработку в Azure во время установки Visual Studio.

Создание таблицы в тестовом кластере

Создайте таблицу с именем StormEvents, которая соответствует схеме данных в файле StormEvents.csv.

Совет

Следующие фрагменты кода создают экземпляр клиента почти для каждого вызова. Это делается для того, чтобы каждый фрагмент можно было запускать отдельно. В рабочей среде экземпляры клиента можно использовать повторно, они могут храниться столько, сколько необходимо. Одного экземпляра клиента для каждого URI достаточно, даже при работе с несколькими базами данных (базу данных можно указать на уровне команды).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Определение сопоставления приема

Выполните сопоставление входящих данных CSV с именами столбцов, использованными при создании таблицы. Настройте объект сопоставления столбца CSV на той таблице.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Установка пакета NuGet для C#

Проверка подлинности

Чтобы выполнить следующий пример, вам потребуется приложение Microsoft Entra и субъект-служба, которые могут получить доступ к ресурсам. Чтобы создать бесплатное приложение Microsoft Entra и добавить назначение ролей на уровне подписки, см. статью "Создание приложения Microsoft Entra". Вам также потребуется идентификатор каталога (клиента), идентификатор приложения и секрет клиента.

Добавление подключения к данным Центров событий

В следующем примере показано, как программно добавить подключение к данным Центров событий. См. раздел подключение к Центрам событий для получения информации о добавлении соединения с данными Центров событий через портал Azure.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Параметр Рекомендуемое значение Описание поля
идентификатор арендатора xxxx-xxxxx-xxxx-xxxx-xxxx-xxxx Идентификатор клиента. Также известен как идентификатор каталога.
идентификатор подписки xxxx-xxxxx-xxxx-xxxx-xxxx-xxxx Идентификатор подписки, используемой для создания ресурсов.
clientId xxxx-xxxxx-xxxx-xxxx-xxxx-xxxx Идентификатор клиента приложения, которое имеет доступ к ресурсам в вашей организации.
клиентский секрет xxxx Секретный ключ клиента приложения, которое может получить доступ к ресурсам в вашем клиенте.
ИмяГруппыРесурсов testrg Имя группы ресурсов, содержащей ваш кластер.
имя кластера mykustocluster Имя вашего кластера.
Имя базы данных mykustodatabase Имя целевой базы данных в вашем кластере.
имяСоединенияДанных myeventhubconnect Необходимое имя вашего подключения для передачи данных.
имя_таблицы StormEvents Имя целевой таблицы в целевой базе данных.
mappingRuleName Карта_Событий_Штормов_CSV Имя сопоставления столбцов, связанного с целевой таблицей.
формат данных csv Формат данных сообщения.
идентификатор ресурса EventHub Идентификатор ресурса Идентификатор ресурса вашего концентратора событий, в котором хранятся данные для обработки.
группа потребителей $По умолчанию Группа потребителей вашего концентратора событий.
расположение Центральная часть США Расположение ресурса подключения к данным.
сжатие Gzip или None Тип сжатия данных.

Сформировать данные

См. пример приложения, которое генерирует данные и отправляет их в концентратор событий.

Событие может содержать одну или несколько записей, вплоть до предельного размера. В следующем примере мы отправляем два события, к каждому из которых добавлено пять записей.

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Очистка ресурсов

Чтобы удалить подключение к данным, выполните следующую команду:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Следующие шаги