Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Это важно
Обозреватель данных Azure Synapse Analytics (предварительная версия) будет прекращен 7 октября 2025 г. После этой даты рабочие нагрузки, работающие в Synapse Data Explorer, будут удалены, а связанные данные приложения будут потеряны. Мы настоятельно рекомендуем мигрировать в Eventhouse на платформе Microsoft Fabric.
Программа Microsoft Cloud Migration Factory (CMF) предназначена для поддержки клиентов при миграции в Fabric. Программа предлагает практические ресурсы клавиатуры без затрат для клиента. Эти ресурсы назначаются в течение 6–8 недель с предопределенной и согласованной областью. Номинации клиентов принимаются от команды учетных записей Microsoft или непосредственно путем отправки запроса на помощь команде CMF.
Azure Synapse Data Explorer — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Azure Synapse Data Explorer позволяет осуществлять загрузку данных из Event Hubs, IoT Hubs и блобов, записанных в контейнеры для блобов.
В этой статье описано, как создать подключение к данным концентратора событий для Azure Synapse Data Explorer с помощью Python.
Предварительные условия
Подписка Azure. Создайте бесплатную учетную запись Azure.
Создать пул Data Explorer с помощью Synapse Studio или портала Azure
Создать базу данных Data Explorer.
В Synapse Studio в области слева выберите элемент Данные.
Выберите + (Добавить новый ресурс) >пул обозревателя данных, а затем используйте следующие сведения:
Настройка Предлагаемое значение Описание Имя пула contosodataexplorer Имя пула Data Explorer, который вы будете использовать. Имя. TestDatabase Имя базы данных должно быть уникальным в пределах кластера. Срок хранения по умолчанию 365. Интервал времени (в днях), в течение которого данные будут гарантированно доступны для запросов. Интервал времени измеряется с момента приема данных. Период кэширования по умолчанию 31 Интервал времени (в днях), в течение которого необходимо хранить часто запрашиваемые данные в хранилище SSD или ОЗУ, а не в долговременном хранилище. Чтобы создать базу данных, выберите Создать. Создание обычно занимает меньше минуты.
Создание таблицы в тестовом кластере
Создайте таблицу с именем StormEvents
, которая соответствует схеме данных в файле StormEvents.csv
.
Совет
Следующие фрагменты кода создают экземпляр клиента почти для каждого вызова. Это делается для того, чтобы каждый фрагмент можно было запускать отдельно. В рабочей среде экземпляры клиента можно использовать повторно, они могут храниться столько, сколько необходимо. Одного экземпляра клиента на каждый URI достаточно, даже при работе с несколькими базами данных (база данных может быть указана на уровне команды).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Определение сопоставления приема
Выполните сопоставление входящих данных CSV с именами столбцов, использованными при создании таблицы. Подготовьте объект сопоставления столбца CSV на этой таблице.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Установка пакета Python
Чтобы установить пакет Python для Azure Synapse Data Explorer, откройте окно командной строки с Python в пути. Выполните следующую команду:
pip install azure-common
pip install azure-mgmt-kusto
Проверка подлинности
Чтобы запустить следующий пример, вам необходимо приложение Microsoft Entra и сервисный принципал, которые могут получить доступ к ресурсам. Чтобы создать бесплатное приложение Microsoft Entra и добавить назначение ролей на уровне подписки, см. статью "Создание приложения Microsoft Entra". Вам также потребуется идентификатор каталога (клиента), идентификатор приложения и секрет клиента.
Добавление подключения к данным Концентратора событий
В следующем примере показано, как программно добавить подключение к данным Концентратора событий. См. раздел Подключение к концентратору событий для добавления подключения к данным концентратора событий с помощью портала Azure.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Параметр | Рекомендуемое значение | Описание поля |
---|---|---|
ид_арендатора | xxxx-xxxxx-xxxx-xxxx-xxxx-xxxx | Идентификатор клиента. Также известен как идентификатор каталога. |
идентификатор подписки | xxxx-xxxxx-xxxx-xxxx-xxxx-xxxx | Идентификатор подписки, используемой для создания ресурсов. |
идентификатор_клиента | xxxx-xxxxx-xxxx-xxxx-xxxx-xxxx | Идентификатор клиента приложения, которое имеет доступ к ресурсам в вашем клиенте. |
секрет_клиента | xxxx | Секрет клиента приложения, которое имеет доступ к ресурсам в вашем аренде. |
имя_группы_ресурсов | testrg | Имя группы ресурсов, содержащей ваш кластер. |
имя_кластера | mykustocluster | Имя вашего кластера. |
название_базы_данных | mykustodatabase | Имя целевой базы данных в вашем кластере. |
Имя_подключения_данных | myeventhubconnect | Необходимое имя вашего подключения для передачи данных. |
имя_таблицы | StormEvents | Имя целевой таблицы в целевой базе данных. |
имя_правила_сопоставления | Карта_Событий_Штормов_CSV | Имя сопоставления столбцов, связанного с целевой таблицей. |
формат данных | csv | Формат данных сообщения. |
идентификатор_ресурса_узла_событий | Идентификатор ресурса | Идентификатор ресурса вашего концентратора событий, в котором хранятся данные для приема. |
группа потребителей | $По умолчанию | Группа потребителей вашего Event Hub. |
расположение | Центральная часть США | Расположение ресурса подключения к данным. |
Очистка ресурсов
Чтобы удалить подключение к данным, выполните следующую команду:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)