Si applica a: ✅Microsoft Fabric✅Esplora dati di Azure
Kusto è in grado di gestire l'assunzione di dati di massa ottimizzando e raggruppando i dati inseriti tramite la gestione batch. La gestione batch aggrega i dati inseriti prima che raggiunga la tabella di destinazione, consentendo un'elaborazione più efficiente e prestazioni migliorate. La creazione di batch viene in genere eseguita in blocchi di 1 GB di dati non elaborati, 1000 singoli file o per un timeout di default di 5 minuti. I criteri di invio in batch possono essere aggiornati a livello di database e tabelle, in genere per ridurre il tempo di invio in batch e ridurre la latenza. Per ulteriori informazioni sull'inserimento in batch, vedere la politica di IngestionBatching e modificare la politica di inserimento in batch a livello di tabella .
Nota
L'invio in batch tiene conto anche di vari fattori, ad esempio il database e la tabella di destinazione, l'utente che esegue l'inserimento e varie proprietà associate all'inserimento, ad esempio tag speciali.
Questo articolo illustra come:
Prerequisiti
Prima di iniziare
Usare uno dei seguenti metodi per creare la tabella MyStormEvents e, poiché viene inserita solo una piccola quantità di dati, impostare il timeout dei criteri di batching di inserimento su 10 secondi.
- Creare una tabella di destinazione denominata MyStormEvents nel database eseguendo la prima app nei comandi di gestione .
- Nei comandi di gestione , impostare il timeout della politica di inserimento in batch su 10 secondi eseguendo la seconda app. Prima di eseguire l'app, modificare il valore di timeout in
00:00:10
.
Nell'ambiente di query creare una tabella di destinazione denominata MyStormEvents nel database eseguendo la query seguente:
.create table MyStormEvents
(StartTime: datetime,
EndTime: datetime,
State: string,
DamageProperty: int,
DamageCrops: int,
Source: string,
StormSummary: dynamic)
Imposta il timeout del criterio di inserimento in batch su 10 secondi eseguendo la seguente query:
.alter-merge table MyStormEvents policy ingestionbatching '{ "MaximumBatchingTimeSpan":"00:00:10" }'
Nota
La propagazione delle nuove impostazioni dei criteri di batch al responsabile di batch può richiedere alcuni minuti.
Scaricare il file di dati di esempio stormevent.csv. Il file contiene 1.000 record di eventi legati a tempeste.
Nota
Gli esempi seguenti presuppongono una corrispondenza semplice tra le colonne dei dati inseriti e lo schema della tabella di destinazione.
Se i dati inseriti non corrispondono in modo semplice allo schema della tabella, è necessario usare un mapping di inserimento per allineare le colonne dei dati allo schema della tabella.
Mettere in coda un file per l'ingestione e interrogare i risultati
Nel tuo IDE o nell'editor di testo preferito, crea un progetto o un file chiamato inserimento di base usando la convenzione appropriata per il linguaggio di programmazione preferito. Posizionare il file stormevent.csv nella stessa posizione dell'app.
Nota
Negli esempi seguenti si usano due client, uno per eseguire query sul cluster e l'altro per inserire i dati nel cluster. Per le lingue in cui la libreria client lo supporta, entrambi i client condividono lo stesso autenticatore della richiesta utente, generando una singola richiesta utente anziché una per ogni client.
Aggiungere il codice seguente:
Crea un'app client che si connette al cluster e stampa il numero di righe nella tabella MyStormEvents. Questo conteggio verrà usato come baseline per il confronto con il numero di righe dopo ogni metodo di inserimento. Sostituire rispettivamente i segnaposto <your_cluster_uri>
e <your_database>
con l'URI del cluster e il nome del database.
-
C#
-
Python
-
TypeScript
-
Java
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Nota
Per le app Node.js, usare InteractiveBrowserCredentialNodeOptions
anziché InteractiveBrowserCredentialInBrowserOptions
.
Nota
Java SDK attualmente non supporta entrambi i client che condividono lo stesso autenticatore di richieste utente, generando una richiesta utente per ogni client.
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
String clusterUri = "<your_cluster_uri>";
ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
String query = table + " | count";
KustoOperationResult results = kustoClient.execute(database, query);
KustoResultSetTable primaryResults = results.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(primaryResults);
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
while (results.next()) {
KustoResultColumn[] columns = results.getColumns();
for (int i = 0; i < columns.length; i++) {
System.out.println("\t" + columns[i].getColumnName() + " - " + (results.getObject(i) == null ? "None" : results.getString(i)));
}
}
}
}
Creare un oggetto generatore di stringhe di connessione che definisce l'URI di inserimento dati, se possibile, usando la condivisione delle stesse credenziali di autenticazione dell'URI del cluster. Sostituire il segnaposto <your_ingestion_uri>
con l'URI di inserimento dati.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
const ingestUri = "<your_ingestion_uri>";
const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials);
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
String ingestUri = "<your_ingestion_uri>";
ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri);
Inserire il file stormevent.csv aggiungendolo alla coda di elaborazione. Vengono usati gli oggetti e le proprietà seguenti:
-
QueuedIngestClient per creare il client di inserimento.
-
IngestionProperties per impostare le proprietà di inserimento.
-
DataFormat per specificare il formato di file come CSV.
-
ignore_first_record per specificare se la prima riga nei file di tipo CSV e simili viene ignorata, utilizzando la logica seguente:
-
True: la prima riga viene ignorata. Usare questa opzione per eliminare la riga di intestazione dai dati testuali tabulari.
-
False: la prima riga viene inserita come riga normale.
Nota
L'ingestione supporta una dimensione massima del file di 6 GB. È consigliabile inserire file compresi tra 100 MB e 1 GB.
-
C#
-
Python
-
TypeScript
-
Java
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Eseguire una query sul numero di righe nella tabella dopo l'inserimento del file e visualizzare l'ultima riga inserita.
Nota
Per consentire il completamento dell'inserimento, attendere 30 secondi prima di interrogare la tabella. Per C#, attendere 60 secondi per consentire di aggiungere asincronamente il file alla coda di inserimento.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Il codice completo dovrebbe essere simile al seguente:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null))
{
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
if (response.GetDataTypeName(i) == "Int64")
Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
else
Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
}
}
}
}
}
import os
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
from azure.identity import InteractiveBrowserCredential
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withTokenCredential(clusterUri, credentials);
const ingestUri = "<your_ingestion_uri>";
const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials);
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>";
const table = "MyStormEventsJS";
const filePath = path.join(__dirname, "stormevents.csv");
const query = table + ` | count`;
let response = await kustoClient.execute(database, query);
printResultsAsValueList(response);
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, queryCount);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
}
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
String clusterUri = "<your_cluster_uri>";
ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri);
String ingestUri = "<your_ingestion_uri>";
ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
String query = table + " | count";
KustoOperationResult results = kustoClient.execute(database, query);
KustoResultSetTable primaryResults = results.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(primaryResults);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
while (results.next()) {
KustoResultColumn[] columns = results.getColumns();
for (int i = 0; i < columns.length; i++) {
System.out.println("\t" + columns[i].getColumnName() + " - " + (results.getObject(i) == null ? "None" : results.getString(i)));
}
}
}
}
Esegui l'app
Nel prompt dei comandi, utilizza il comando seguente per eseguire l'app:
# Change directory to the folder that contains the management commands project
dotnet run .
python basic_ingestion.py
In un ambiente Node.js:
node basic-ingestion.js
In un ambiente browser usare il comando appropriato per eseguire l'app. Ad esempio, per Vite-React:
npm run dev
Nota
In un ambiente browser aprire la console degli strumenti di sviluppo per visualizzare l'output.
mvn install exec:java -Dexec.mainClass="<groupId>.BatchIngestion"
Verrà visualizzato un risultato simile al seguente:
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Accoda i dati in memoria per l'inserimento ed eseguire query sui risultati
È possibile inserire dati dalla memoria creando un flusso contenente i dati e quindi accodandoli per l'inserimento.
Ad esempio, è possibile modificare l'app sostituendo l'inserimento dal file codice, come indicato di seguito:
Aggiungere il pacchetto del descrittore del flusso alle importazioni all'inizio del file.
Non sono necessari pacchetti aggiuntivi.
import io
from azure.kusto.ingest import StreamDescriptor
import { Readable } from "stream";
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
Aggiungere una stringa in memoria con i dati da inserire.
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
string_stream = io.StringIO(single_line)
const singleLine = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"';
const stringStream = new Readable();
stringStream.push(singleLine);
stringStream.push(null);
String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
InputStream stream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(singleLine).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream);
Impostare le proprietà di inserimento per non ignorare il primo record, poiché la stringa presente in memoria non ha una riga di intestazione.
ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False)
ingestProps.ignoreFirstRecord = false;
ingestProps.setIgnoreFirstRecord(false);
Inserire i dati nella memoria aggiungendoli alla coda batch. Se possibile, specificare le dimensioni dei dati non elaborati.
-
C#
-
Python
-
TipoScript
-
Java
_= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line))
ingest_client.ingest_from_stream(stream_descriptor, ingest_props)
stringStream.size = singleLine.length;
await ingestClient.ingestFromStream(stringStream, ingestProps);
ingestClient.ingestFromStream(streamSourceInfo, ingestProps);
Una struttura del codice aggiornato dovrebbe essere simile alla seguente:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
...
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
...
Console.WriteLine("\nIngesting data from memory:");
ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
_= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
...
}
}
static void PrintResultsAsValueList(IDataReader response) {
...
}
}
}
import io
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, StreamDescriptor
from azure.identity import InteractiveBrowserCredential
def main():
...
single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
string_stream = io.StringIO(single_line)
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
...
print("\nIngesting data from memory:")
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False)
stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line))
ingest_client.ingest_from_stream(stream_descriptor, ingest_props)
...
def print_result_as_value_list(response):
...
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
import { Readable } from "stream";
async function main() {
...
const singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
const stringStream = Readable.from(singleLine);
stringStream.push(singleLine);
stringStream.push(null);
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>"
const table = "MyStormEvents"
...
console.log("\nIngesting data from memory:");
stringStream.size = singleLine.length;
ingestProps.ignoreFirstRecord = false;
await ingestClient.ingestFromStream(stringStream, ingestProps);
...
}
function sleep(time) {
...
}
function printResultsAsValueList(response) {
...
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
...
String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
InputStream stream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(singleLine).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
...
System.out.println("\nIngesting data from memory:");
ingestProps.setIgnoreFirstRecord(false);
ingestClient.ingestFromStream(streamSourceInfo, ingestProps);
...
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
...
}
}
Quando si esegue l'app, verrà visualizzato un risultato simile al seguente. Si noti che dopo l'inserimento, il numero di righe nella tabella è aumentato di uno.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Mettere in coda un blob per l'inserimento e interrogare i risultati
È possibile inserire dati da BLOB di Archiviazione di Azure, file di Azure Data Lake e file Amazon S3.
Ad esempio, è possibile modificare l'app sostituendo il codice relativo all'ingestione dalla memoria con il seguente codice:
Per iniziare, carica il file stormevent.csv nell'account di archiviazione e genera un URI con autorizzazioni di lettura, ad esempio, usando un token di accesso condiviso per i blob di Azure.
Aggiungi il pacchetto descrittore BLOB alle importazioni in cima al file.
-
C#
-
Python
-
TypeScript
-
Java
Non sono necessari pacchetti aggiuntivi.
from azure.kusto.ingest import BlobDescriptor
No additional packages are required.
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
Creare un descrittore blob usando l'URI del blob, impostare le proprietà di ingestion e successivamente inserire i dati dal blob. Sostituire il segnaposto <your_blob_uri>
con l'URI del BLOB.
string blobUri = "<your_blob_uri>";
ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
_= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
blob_uri = "<your_blob_uri>"
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
blob_descriptor = BlobDescriptor(blob_uri)
ingest_client.ingest_from_blob(blob_descriptor, ingest_props)
const blobUri = "<your_blob_uri>";
ingestProps.ignoreFirstRecord = true;
await ingestClient.ingestFromBlob(blobUri, ingestProps);
String blobUri = "<your_blob_uri>";
ingestProps.setIgnoreFirstRecord(true);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100);
ingestClient.ingestFromBlob(blobSourceInfo, ingestProps);
Una struttura del codice aggiornato dovrebbe essere simile alla seguente:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
...
string blobUri = "<your_blob_uri>";
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
...
Console.WriteLine("\nIngesting data from memory:");
ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
_=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
...
}
}
static void PrintResultsAsValueList(IDataReader response) {
...
}
}
}
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, BlobDescriptor
from azure.identity import InteractiveBrowserCredential
def main():
...
blob_uri = "<your_blob_uri>"
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
...
print("\nIngesting data from a blob:")
blob_descriptor = BlobDescriptor(blob_uri)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_blob(blob_descriptor, ingest_props)
...
def print_result_as_value_list(response):
...
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
import { Readable } from "stream";
async function main() {
...
const blobUri = "<your_blob_uri>";
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>"
const table = "MyStormEvents"
...
console.log("\nIngesting data from a blob:");
ingestProps.ignoreFirstRecord = true;
await ingestClient.ingestFromBlob(blobUri, ingestProps);
...
}
function sleep(time) {
...
}
function printResultsAsValueList(response) {
...
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
...
String blobUri = "<your_blob_uri>";
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
...
System.out.println("\nIngesting data from a blob:");
ingestProps.setIgnoreFirstRecord(true);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100);
ingestClient.ingestFromBlob(blobSourceInfo, ingestProps);
...
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
...
}
}
Quando si esegue l'app, verrà visualizzato un risultato simile al seguente. Si noti che dopo l'inserimento, il numero di righe nella tabella è aumentato di 1.000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Passaggio successivo