Создание приложения для получения данных с помощью приема в очереди

Переключение служб с помощью раскрывающегося списка версий . Дополнительные сведения о навигации.
Область применения: ✅ Microsoft Fabric ✅ Azure Data Explorer

Kusto может обрабатывать массовое потребление данных путем оптимизации и пакетного приема данных с помощью диспетчера пакетной обработки. Диспетчер пакетной обработки агрегирует принятые данные до того, как они достигнут целевой таблицы, что позволяет повысить эффективность обработки и производительность. Пакетная обработка обычно выполняется партиями по 1 ГБ необработанных данных, 1000 отдельных файлов или с тайм-аутом по умолчанию в 5 минут. Политики пакетной обработки можно обновлять на уровнях базы данных и таблиц, обычно уменьшая время пакетной обработки и уменьшая задержку. Дополнительные сведения о пакетной обработке данных см. в политике IngestionBatching и о программном изменении политики пакетной обработки данных на уровне таблицы .

Заметка

Пакетная обработка также учитывает различные факторы, такие как целевая база данных и таблица, пользователь, выполняющий прием, и различные свойства, связанные с приемом, например специальные теги.

В этой статье описано, как:

Это важно

API приема теперь имеет две версии: V1 и V2. API версии 1 — это исходный API, а API версии 2 — это переосмысленая версия, которая упрощает прием API, предлагая дополнительные настройки.

Прием версии 2 доступен в предварительной версии и доступен на следующих языках: C#

Необходимые условия

Перед началом работы

  • Используйте один из следующих методов, чтобы создать таблицу MyStormEvents и, так как выполняется прием только небольшого объема данных, установите время ожидания политики пакетной обработки приема на 10 секунд.

    1. Создайте целевую таблицу с именем MyStormEvents в базе данных, выполнив первое приложение в командах управления .
    2. Задайте время ожидания политики пакетного приема до 10 секунд, запустив второе приложение в командах управления . Перед запуском приложения измените значение времени ожидания на 00:00:10.
  • Скачайте пример файла данных stormevent.csv. Файл содержит 1 000 записей штормовых событий.

    Заметка

    В следующих примерах предполагается простое соответствие между столбцами загружаемых данных и схемой таблицы назначения.

    Если поступающие данные не совпадают с таблицей по схеме, необходимо использовать сопоставление приёма данных для соотнесения столбцов данных со схемой таблицы.

Поставить файл в очередь для загрузки и запросить результаты

В предпочтительном интегрированной среде разработки или текстовом редакторе создайте проект или файл с именем базовый прием с помощью соглашения, подходящего для предпочитаемого языка. Поместите файл stormevent.csv в то же расположение, что и приложение.

Заметка

В следующих примерах используется два клиента, один для запроса кластера и другого для приема данных в кластер. Для языков, в которых клиентская библиотека поддерживает ее, оба клиента совместно используют один и тот же средство проверки подлинности пользовательского запроса, что приводит к одному запросу пользователя вместо одного для каждого клиента.

Добавьте следующий код:

  1. Создайте клиентское приложение, которое подключается к кластеру и выводит количество строк в таблице MyStormEvents. Это число будет использоваться в качестве базового показателя для сравнения с числом строк после каждого метода приема. Замените заполнители <your_cluster_uri> и <your_database> URI кластера и именем базы данных соответственно.

    using System.Data;
    
    using Kusto.Data;
    using Kusto.Data.Common;
    using Kusto.Data.Net.Client;
    
    using Azure.Identity;
    
    namespace BatchIngest;
    
    class BatchIngest
    {
      static async Task Main()
      {
        var tokenCredential = new InteractiveBrowserCredential();
        var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
        var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
    
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
    
        var database = "<your_database>";
        var table = "MyStormEvents";
    
        var query = table + " | count";
    
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
      }
    
    static void PrintResultsAsValueList(IDataReader response)
    {
      while (response.Read())
      {
          for (var i = 0; i < response.FieldCount; i++)
          {
              object val = response.GetValue(i);
              string value = val.ToString() ?? "None";
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
          }
      }
      }
    }
    
  2. Создайте объект построителя строк подключения, определяющий универсальный код ресурса (URI) приема данных, где это возможно, с помощью общего доступа к тем же учетным данным проверки подлинности, что и URI кластера. Замените заполнитель <your_ingestion_uri> на URI загрузки данных.

    using Kusto.Ingest; // Add this import
    
    // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
    
    using Kusto.Ingest.V2; // Add this import
    
    // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
    
  3. Внесите файл stormevent.csv в очередь пакетной обработки.

    Вы используете следующие объекты и свойства:

    • QueuedIngestClient для создания клиента приема.

    • IngestionProperties Значение , чтобы задать свойства приема.

    • DataFormat Значение , чтобы указать формат файла в формате CSV.

    • ignore_first_record Чтобы указать, игнорируется ли первая строка в CSV и аналогичных типах файлов, используя следующую логику:

      • True: первая строка игнорируется. Используйте этот параметр, чтобы удалить строку заголовка из табличных текстовых данных.
      • False: первая строка приемируется как обычная строка.

      Заметка

      Прием поддерживает максимальный размер файла размером 6 ГБ. Рекомендация заключается в приеме файлов в диапазоне от 100 МБ до 1 ГБ.

    using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);
    
    string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
    Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
    var ingestProps = new KustoIngestionProperties(database, table) {
      Format = DataSourceFormat.csv,
      AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
    };
    await ingestClient.IngestFromStorageAsync(filePath, ingestProps);
    

    Вы используете следующие объекты и свойства:

    • QueuedIngestClientBuilder для создания клиента приема.
    • IngestProperties является необязательным в большинстве случаев, но здесь используется для задания IgnoreFirstRecord.
    • DataFormat , чтобы указать формат файла как DataSourceFormat.csv.
    • IgnoreFirstRecord Чтобы указать, игнорируется ли первая строка в CSV и аналогичных типах файлов, используя следующую логику:
      • True: первая строка игнорируется. Используйте этот параметр, чтобы удалить строку заголовка из табличных текстовых данных.
      • False: первая строка приемируется как обычная строка.

    Заметка

    Прием поддерживает максимальный размер файла размером 6 ГБ. Рекомендация заключается в приеме файлов в диапазоне от 100 МБ до 1 ГБ.

    using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
    
    string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
    var fileSource = new FileSource(filePath, DataSourceFormat.csv);
    var props = new IngestProperties() { IgnoreFirstRecord = true };
    
    Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
    
    await ingestClient.IngestAsync(fileSource, database, table, props);
    
  4. Узнайте количество строк в таблице после загрузки файла и покажите последнюю строку, которая была добавлена.

    Заметка

    Чтобы дать времени на завершение процесса приема данных, подождите 30 секунд, прежде чем делать запрос к таблице. Для C# подождите 60 секунд, чтобы дать время для асинхронного добавления файла в очередь обработки.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    await Task.Delay(TimeSpan.FromSeconds(60));
    
    using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Полный код должен выглядеть следующим образом:

using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
        var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadUserPromptAuthentication();

        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);

        var database = "<your database>";
        var table = "MyStormEvents";

        var query = table + " | count";

        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }

        using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);

        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
            Format = DataSourceFormat.csv,
            AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        await ingestClient.IngestFromStorageAsync(filePath, ingestProps);

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        await Task.Delay(TimeSpan.FromSeconds(60));

        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
            Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
            PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
            Console.WriteLine("\nLast ingested row:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;

namespace BatchIngest;

class BatchIngest
{
   static async Task Main()
   {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);

       using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);

       var database = "<your_database>";
       var table = "MyStormEvents";

       var query = table + " | count";

       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
       {
           Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
           PrintResultsAsValueList(response);
       }

       using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();

       string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

       var fileSource = new FileSource(filePath, DataSourceFormat.csv);
       var props = new IngestProperties() { IgnoreFirstRecord = true };

       Console.WriteLine("\nIngesting data from file: \n\t " + filePath);

       await ingestClient.IngestAsync(fileSource, database, table, props);

       Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
       await Task.Delay(TimeSpan.FromSeconds(60));

       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
           Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
           PrintResultsAsValueList(response);
       }

       query = table + " | top 1 by ingestion_time()";
       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
           Console.WriteLine("\nLast ingested row:");
           PrintResultsAsValueList(response);
       }
   }

   static void PrintResultsAsValueList(IDataReader response)
   {
       while (response.Read())
       {
           for (var i = 0; i < response.FieldCount; i++)
           {
               object val = response.GetValue(i);
               string value = val.ToString() ?? "None";
               Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
           }
       }
   }
}

Запуск приложения

В командной оболочке используйте следующую команду для запуска приложения:

# Change directory to the folder that contains the management commands project
dotnet run .

Результат должен выглядеть следующим образом:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Очереди данных в памяти для приема и запроса результатов

Вы можете получать данные из памяти, создавая поток, содержащий данные, а затем добавляя его в очередь для приема.

Например, можно изменить приложение, заменив загрузку из файла на код следующим образом.

  1. Добавьте пакет дескриптора потока в импорт в верхней части файла.

    Дополнительные пакеты не требуются.

  2. Добавьте строку в оперативной памяти с данными для обработки.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Задайте свойства приема, чтобы не игнорировать первую запись, так как строка в памяти не имеет строки заголовка.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
    // Remove the IngestionProperties object `props`
    
  4. Добавляйте данные из памяти в пакетную очередь. По возможности укажите размер необработанных данных.

  5. _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
    
        var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);
    
        await ingestClient.IngestAsync(streamSource, database, table);
    

Структура обновленного кода должна выглядеть следующим образом:

using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
        var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
        ...

        _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
        var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
        ...

        var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);

        await ingestClient.IngestAsync(streamSource, database, table);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}

При запуске приложения вы увидите результат, аналогичный приведенному ниже. Обратите внимание, что после приема число строк в таблице увеличилось на один.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Поставьте большой двоичный объект в очередь для обработки и запроса результатов.

Вы можете загружать данные из объектов Blob в хранилище Azure, файлов Azure Data Lake и файлов Amazon S3.

Например, можно изменить приложение, заменив ввод данных из памяти в код следующим образом:

  1. Сначала отправьте файл stormevent.csv в учетную запись хранения и создайте универсальный код ресурса (URI) с разрешениями на чтение, например с помощью маркера SAS для БОЛЬШИХ двоичных объектов Azure.

  2. Добавьте пакет с дескриптором BLOB в раздел импортов в верхней части файла.

    Дополнительные пакеты не требуются.

  3. Создайте дескриптор большого двоичного объекта с помощью URI большого двоичного объекта, задайте свойства приема и приема данных из большого двоичного объекта. Замените заполнитель <your_blob_uri> на URI blob.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    
    var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
    
    await ingestClient.IngestAsync(blobSource, database, table);
    

Структура обновленного кода должна выглядеть следующим образом:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string blobUri = "<your_blob_uri>";
        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        await ingestClient.IngestFromStorageAsync(blobUri, ingestProps);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string blobUri = "<your_blob_uri>";
        ...

        Console.WriteLine("\nIngesting data from memory:");
        var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
        var props = new IngestProperties() { IgnoreFirstRecord = true };

        await ingestClient.IngestAsync(blobSource, database, table, props);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}

При запуске приложения вы увидите результат, аналогичный приведенному ниже. Обратите внимание, что после приема число строк в таблице увеличилось на 1000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Следующий шаг