Прием потоковой передачи полезен для загрузки данных, если требуется низкая задержка между приемом и запросом. Рекомендуется использовать прием потоковой передачи в следующих сценариях:
- Требуется задержка меньше секунды.
- Для оптимизации операционной обработки многих таблиц, когда поток данных в каждой таблице относительно мал (несколько записей в секунду), а общий объем приема данных большой (тысячи записей в секунду).
Если поток данных в каждую таблицу высок (более 4 ГБ в час), рассмотрите возможность пакетного приема.
Дополнительные сведения о различных методах приема см. в обзоре приема данных.
Выбор соответствующего типа приема потоковой передачи
Поддерживаются два типа приема потоковой передачи:
| Тип приема |
Описание |
| Подключение к данным |
Центр событий, Центр Интернета вещей и соединения данных Event Grid могут использовать прием потоковых данных, если он включен на уровне кластера. Решение об использовании приема потоковой передачи выполняется в соответствии с политикой приема потоковой передачи, настроенной в целевой таблице. Сведения об управлении подключениями к данным см. в разделе "Центр событий", "Центр Интернета вещей " и " Сетка событий". |
|
Настраиваемый прием данных |
Настраиваемый ввод требует написания приложения, использующего одну из клиентских библиотек Azure Data Explorer. Используйте информацию в этой теме для настройки индивидуального процесса ввода данных. Вы также можете найти полезным пример приложения для потоковой обработки данных на C#. |
Используйте следующую таблицу, чтобы выбрать тип приема, подходящий для вашей среды:
| Критерий |
Подключение к данным |
Настраиваемая загрузка |
| Задержка данных между началом приема и данными, доступными для запроса |
Более длинная задержка |
Более короткая задержка |
| Затраты на разработку |
Быстрая и простая настройка, без затрат на разработку |
Высокая нагрузка на разработку для создания приложения приема данных, обработки ошибок и обеспечения согласованности данных |
Замечание
Вы можете управлять процессом, чтобы включить и отключить прием потоковой передачи в кластере с помощью портала Azure или программно в C#. Если вы используете C# для пользовательского приложения, вы можете найти его более удобным с помощью программного подхода.
Предпосылки
Основными участниками, которые могут повлиять на прием потоковой передачи, являются:
-
Размер виртуальной машины и кластера: производительность приема потоковой передачи и масштабирование емкости с увеличением размера виртуальной машины и кластера. Число одновременных запросов приема ограничено шестью на одно ядро. Например, для SKU с 16 ядрами, такими как D14 и L16, максимальная допустимая нагрузка составляет 96 одновременных запросов на прием. Для двух основных SKU, таких как D11, максимальная поддерживаемая нагрузка составляет 12 одновременных запросов на ввод данных.
-
Ограничение размера данных: ограничение размера данных для запроса приема потоковой передачи составляет 4 МБ. Сюда входят все данные, созданные для стратегий обновления во время загрузки.
-
Обновления схемы: такие обновления схемы, как создание и изменение таблиц и мэппингов приема данных, могут занять до пяти минут в службе потокового приема данных. Дополнительные сведения см. в разделе "Прием потоковой передачи" и изменения схемы.
-
Емкость SSD: Включение потоковой загрузки в кластере, даже если данные не загружаются потоком, использует часть локального SSD диска на машинах кластера для данных потоковой загрузки и сокращает доступное хранилище для горячего кэша.
Включение приема потоковой передачи в кластере
Прежде чем использовать прием потоковой передачи, необходимо включить возможности в кластере и определить политику приема потоковой передачи. Вы можете включить эту возможность при создании кластера или добавить его в существующий кластер.
Предупреждение
Просмотрите ограничения перед включением приема потоковой передачи.
Включение приема потоковой передачи при создании нового кластера
Вы можете включить прием потоковой передачи при создании нового кластера с помощью портала Azure или программно в C#.
Чтобы включить прием потоковой передачи при создании нового кластера Azure Data Explorer, выполните следующий код:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Включение приема потоковой передачи в существующем кластере
Если у вас есть существующий кластер, можно включить прием потоковой передачи с помощью портала Azure или программно в C#.
На портале Azure перейдите в кластер Azure Data Explorer.
В разделе "Параметры" выберите "Конфигурации".
В области "Конфигурации" выберите "Включить ", чтобы включить прием потоковой передачи.
Нажмите кнопку "Сохранить".
Вы можете включить прием потоковой передачи при создании нового кластера Azure Data Explorer.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Создание целевой таблицы и определение политики
Создайте таблицу для получения данных приема потоковой передачи и определите ее связанную политику с помощью портала Azure или программно в C#.
На портале Azure перейдите к кластеру.
Выберите "Запрос".
Чтобы создать таблицу, которая будет получать данные через прием потоковой передачи, скопируйте следующую команду в область запроса и нажмите кнопку "Выполнить".
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
Скопируйте одну из следующих команд в область запроса и выберите "Выполнить". Это определяет политику приема потоковой передачи в созданной таблице или базе данных, содержащей таблицу.
Подсказка
Политика, определенная на уровне базы данных, применяется ко всем существующим и будущим таблицам в базе данных. Если вы включите политику на уровне базы данных, ее не нужно включить для каждой таблицы.
Чтобы определить политику в созданной таблице, используйте следующую команду:
.alter table TestTable policy streamingingestion enable
Чтобы определить политику в базе данных, содержащей созданную таблицу, используйте следующую команду:
.alter database StreamingTestDb policy streamingingestion enable
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Создайте приложение потоковой загрузки для загрузки данных в кластер
Создайте приложение для приема данных в кластер с помощью предпочтительного языка.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Отключение приема потоковой передачи в кластере
Предупреждение
Отключение приема потоковой передачи может занять несколько часов.
Перед отключением приема потоковой передачи в кластере Azure Data Explorer удалите политику приема потоковой передачи из всех соответствующих таблиц и баз данных. Удаление политики приема потоковой передачи активирует изменение порядка данных в кластере Azure Data Explorer. Данные приема потоковой передачи перемещаются из исходного хранилища в постоянное хранилище в хранилище столбцов (экстенты или сегменты). Этот процесс может занять от нескольких секунд до нескольких часов в зависимости от объема данных в начальном хранилище.
Удаление политики приема потоковой передачи
Политику приема потоковой передачи можно удалить с помощью портала Azure или программно в C#.
На портале Azure перейдите в кластер Azure Data Explorer и выберите "Запрос".
Чтобы удалить политику приема потоковой передачи из таблицы, скопируйте следующую команду в область запроса и выберите "Выполнить".
.delete table TestTable policy streamingingestion
В разделе "Параметры" выберите "Конфигурации".
В области "Конфигурации" нажмите кнопку "Отключить ", чтобы отключить прием потоковой передачи.
Нажмите кнопку "Сохранить".
Чтобы удалить политику приема потоковой передачи из таблицы, выполните следующий код:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Чтобы отключить прием потоковой передачи в кластере, выполните следующий код:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Ограничения
-
Сопоставления данных должны быть предварительно созданы для использования в приеме потоковой передачи. Отдельные запросы приема потоковой передачи не соответствуют встроенным сопоставлениям данных.
-
Теги экстентов нельзя задать для данных потоковой загрузки.
-
Обновление политики. Политика обновления может ссылаться только на недавно загруженные данные в исходной таблице, а не на другие данные или таблицы в базе данных.
- Если прием потоковой передачи включен в кластере, используемом в качестве лидера для последовательных баз данных, приём потоковой передачи необходимо включить на следующих кластерах, чтобы следовать за данными потоковой передачи. То же самое относится к тому, используются ли данные кластера через общую папку данных.
Дальнейшие шаги