Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Прием потоковой передачи используется для загрузки данных, если требуется низкая задержка между приемом и запросом. Вы можете использовать прием потоковой передачи в следующих сценариях:
- Требуется задержка меньше секунды.
- Для оптимизации операционной обработки многих таблиц, когда поток данных в каждой таблице относительно мал (несколько записей в секунду), а общий объем приема данных большой (тысячи записей в секунду).
Если поток данных в каждую таблицу высок (более 4 ГБ в час), рассмотрите возможность приема в очереди.
Дополнительные сведения о методах приема данных см. в статье Общие сведения о приеме данных.
Примеры кода на основе предыдущих версий пакета SDK см. в архивной статье.
Выбор соответствующего типа приема потоковой передачи
Поддерживаются два типа приема потоковой передачи:
| Тип приема | Описание |
|---|---|
| Подключение к данным | Центры событий, Центр Интернета вещей и подключения к данным сетки событий могут использовать прием потоковой передачи, если он включен на уровне кластера. Решение об использовании приема потоковой передачи выполняется в соответствии с политикой приема потоковой передачи, настроенной в целевой таблице. Сведения об управлении подключениями к данным см. в разделе "Концентратор событий", "Центр Интернета вещей" и "Сетка событий". |
| Настраиваемый прием | Настраиваемая загрузка требует написания приложения, использующего одну из клиентских библиотек Azure Data Explorer. Используйте сведения, приведенные в этой статье, чтобы настроить пользовательскую загрузку. Вы также можете найти полезным пример приложения для приема потоковых данных C?view=azure-data-explorer&preserve-view=true#. |
Используйте следующую таблицу, чтобы выбрать тип приема, подходящий для вашей среды.
| Критерий | Подключение к данным | Настраиваемое поглощение данных |
|---|---|---|
| Задержка данных между инициированием приема и данными, доступными для запроса | Более длительная задержка | Более короткая задержка |
| Затраты на разработку | Быстрая и простая установка, без дополнительных затрат на разработку | Высокая нагрузка при разработке приложения, принимающего данные, обработки ошибок и обеспечения согласованности данных. |
Примечание.
Вы можете управлять процессом включения и отключения приема потоковой передачи в кластере с помощью портала Azure или программных средств C#. Если вы используете C# для пользовательского приложения, вы можете найти его более удобным с помощью программного подхода.
Предварительные условия
- Подписка Azure. Создайте бесплатную учетную запись Azure.
Рекомендации по обеспечению производительности и эксплуатации
Основные факторы, которые могут повлиять на прием потоковой передачи:
- Размер виртуальной машины и кластера. Производительность и емкость приема потоковой передачи повышаются с увеличением размера виртуальной машины и кластера. Число параллельных запросов приема ограничено шестью на ядро. Например, для номеров SKU с 16 ядрами, таких как D14 и L16, максимальная поддерживаемая загрузка составляет 96 одновременных запросов на прием. Для SKU с двумя ядрами, такими как D11, максимальная поддерживаемая нагрузка составляет 12 параллельных запросов на прием.
- Ограничение на объем данных. Предельный объем данных для запроса на прием потоковой передачи составляет 4 МБ. Это включает любые данные, созданные для политик обновлений во время загрузки.
- Обновления схемы. Обновления схемы, такие как создание и изменение таблиц и сопоставление данных, для службы приема потоковой передачи могут занимать до 5 минут. Дополнительные сведения см. в статье Прием потоковой передачи данных и изменения схемы.
- Емкость SSD. При включении приема потоковой передачи в кластере, даже если данные не принимаются через потоковую передачу, для данных приема потоковой передачи будет использоваться часть локального диска SSD компьютеров кластера. Это уменьшает объем хранилища, доступный для горячего кэша.
- Курсоры базы данных: при использовании приема потоковой передачи обновления курсора базы данных могут отстать от доступности данных до 60 секунд. Эта задержка возникает из процессов асинхронного фонового запечатывания, которые передают данные из буфера потоковой передачи в постоянные экстенты хранилища столбцов, в течение которых курсор (используется для добавочной обработки, непрерывного экспорта или материализованных представлений) обновляется. Если ваша рабочая нагрузка требует немедленной согласованности курсора для семантики "точно один раз", рассмотрите возможность использования очередной загрузки или учтите эту потенциальную задержку в логике приложения.
Включите прием потоковой передачи в своем кластере
Прежде чем можно будет использовать прием потоковой передачи, необходимо включить эту возможность в кластере и определить политику приема потоковой передачи. Вы можете включить эту возможность при создании кластера или добавить ее в существующий кластер.
Предупреждение
Перед включением приема потоковой передачи ознакомьтесь с ограничениями.
Включение приема потоковой передачи при создании нового кластера
Вы можете включить прием потоковой передачи при создании нового кластера с помощью портала Azure или программных средств C#.
При создании кластера с помощью действий, описанных в статье Создание кластера и базы данных Azure Data Explorer, на вкладке Конфигурации выберите Прием потоковой передачи>Включить.
Включение приема потоковой передачи в существующем кластере
Если у вас есть существующий кластер, вы можете включить прием потоковой передачи с помощью портала Azure или программных средств C#.
На портале Azure перейдите к кластеру Azure Data Explorer.
В разделе Параметры выберите пункт Конфигурации.
В области Конфигурации выберите Вкл., чтобы включить Прием потоковой передачи.
Выберите Сохранить.
Создание целевой таблицы и определение политики
Создайте таблицу для получения данных приема потоковой передачи и определите связанную с ней политику с помощью портала Azure или программных средств C#.
На портале Azure перейдите к своему кластеру.
Выберите Запрос.
Чтобы создать таблицу, которая будет получать данные посредством потоковой передачи, скопируйте следующую команду в Панель запросов и выберите Выполнить.
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
Скопируйте одну из следующих команд в Панель запросов и выберите Выполнить. Будет определена политика приема потоковой передачи для созданной вами таблицы или базы данных, содержащей эту таблицу.
Совет
Политика, которая определяется на уровне базы данных, применяется ко всем существующим и будущим таблицам в базе данных. Если вы включите политику на уровне базы данных, ее не нужно включить для каждой таблицы.
Чтобы определить политику для созданной таблицы, выполните следующий код:
.alter table TestTable policy streamingingestion enableЧтобы определить политику для базы данных, содержащей созданную таблицу, выполните следующий код:
.alter database StreamingTestDb policy streamingingestion enable
Создайте приложение для потоковой загрузки данных в ваш кластер
Создайте приложение для приема данных в кластере с помощью выбранного вами языка.
Примечание.
Сведения о приеме в очереди см. в инструкциях по созданию приложения для получения данных с помощью приема в очереди
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.gzip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
Отключение приема потоковой передачи в кластере
Предупреждение
Отключение приема потоковой передачи может занять несколько часов.
Перед отключением приема потоковой передачи в кластере Azure Data Explorer удалите политику приема потоковой передачи из всех соответствующих таблиц и баз данных. Удаление политики приема потоковой передачи активирует реорганизацию данных в кластере Azure Data Explorer. Данные приема потоковой передачи перемещаются из первоначального хранилища в постоянное хранилище в хранилище столбцов (экстентов или сегментов). Этот процесс может занять от нескольких секунд до нескольких часов в зависимости от объема данных в первоначальном хранилище.
Удаление политики приема потоковой передачи
Политику приема потоковой передачи можно удалить с помощью портала Azure или программных средств C#.
На портале Azure перейдите в кластер Azure Data Explorer и выберите Запрос.
Чтобы удалить политику приема потоковой передачи из таблицы, скопируйте следующую команду в Панель запросов и выберите Выполнить.
.delete table TestTable policy streamingingestion
В разделе Параметры выберите пункт Конфигурации.
В области Конфигурации выберите Выкл., чтобы отключить Потоковую инжекцию.
Выберите Сохранить.
Ограничения
- Сопоставления данных должны быть предварительно созданы, чтобы их можно было использовать для приема потоковой передачи. Отдельные запросы на прием потоковой передачи не поддерживают встроенные сопоставления данных.
- Невозможно установить теги экстентов для данных приема потоковой передачи.
-
Обновление политики
- Политика обновления может ссылаться только на недавно загруженные данные в исходной таблице, а не на другие данные или таблицы в базе данных.
- Плагин Python не поддерживается
- Если политика обновления с политикой транзакций завершается сбоем, повторные попытки переходят на пакетную обработку.
- Для каскадных политик обновления, включающих
joinоператор, необходимо отключить потоковое поглощение во всех вышестоящих таблицах. Например, рассмотрим каскадные политики обновления, в которых Table1 обновляет Table2, Table2 обновляет Table3 и Table3 обновляет Table4. Если политика обновления Table4 включает соединение, необходимо отключить прием потоковой передачи в Table1, Table2 и Table3.
- Если потоковый прием данных включен для кластера-лидера, кластеры-фолловеры также должны использовать потоковый прием данных, чтобы отслеживать данные потокового приема. Это же условие применимо при общем доступе к данным кластера через Data Share.