Поделиться через


Подключение к данным Центров событий (предварительная версия)

Это важно

Обозреватель данных Azure Synapse Analytics (предварительная версия) будет прекращен 7 октября 2025 г. После этой даты рабочие нагрузки, работающие в Synapse Data Explorer, будут удалены, а связанные данные приложения будут потеряны. Мы настоятельно рекомендуем мигрировать в Eventhouse на платформе Microsoft Fabric.

Программа Microsoft Cloud Migration Factory (CMF) предназначена для поддержки клиентов при миграции в Fabric. Программа предлагает практические ресурсы клавиатуры без затрат для клиента. Эти ресурсы назначаются в течение 6–8 недель с предопределенной и согласованной областью. Номинации клиентов принимаются от команды учетных записей Microsoft или непосредственно путем отправки запроса на помощь команде CMF.

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Azure Synapse Data Explorer предлагает возможности непрерывного приема данных из управляемых клиентами Центров событий.

Конвейер сбора данных центров событий передает события в Azure Synapse Data Explorer несколькими этапами. Сначала вы создаете Event Hubs в портале Azure. Затем вы создадите целевую таблицу в Azure Synapse Data Explorer, в которой данные в определенном формате будут приняты с использованием заданных свойств приема. Подключение к Центрам событий должно поддерживать маршрутизацию событий. Данные внедряются с выбранными свойствами в соответствии с сопоставлением свойств системы событий. Создайте подключение к Центрам событий для создания центров событий и отправки событий. Этим процессом можно управлять с помощью портала Azure, программно с помощью C# или Pythonили с помощью шаблона Azure Resource Manager.

Общие сведения о приеме данных в Azure Synapse Data Explorer см. в разделе Обзор приема данных в Azure Synapse Data Explorer.

Формат данных

  • Данные считываются из Центров событий в виде объектов EventData .

  • См. раздел Поддерживаемые форматы.

    Примечание.

    Центр событий не поддерживает формат RAW.

  • Данные можно сжимать с помощью алгоритма сжатия GZip. Укажите Compression в свойствах загрузки.

    • Сжатые форматы (Avro, Parquet, ORC) не поддерживают сжатие данных.
    • Сжатые данные не поддерживают пользовательское кодирование и внедренные системные свойства.

Свойства приема

Свойства приема указывают процессу приема, куда направлять данные и как их обрабатывать. Вы можете указать свойства приема для приема событий с помощью EventData.Properties. Можно задать следующие свойства:

Свойство Описание
Таблица Имя (с учетом регистра) существующей целевой таблицы. Переопределяет параметры Table на панели Data Connection.
Формат Формат данных. Переопределяет параметры Data format на панели Data Connection.
Загрузка IngestionMappingReference Имя существующей карты загрузки данных для использования. Переопределяет параметры Column mapping на панели Data Connection.
Сжатие Сжатие данных, None (по умолчанию), или сжатие GZip.
Кодировка Кодировка данных, по умолчанию — UTF8. Может быть любой из поддерживаемых кодировок .NET.
Теги Список тегов, которые нужно связать с принятыми данными, в формате строки массива JSON. При использовании тегов есть влияние на производительность.

Примечание.

Только события, добавленные в очередь после создания подключения данных, обрабатываются.

Маршрутизация событий

При настройке подключения Центров событий к кластеру Azure Synapse Data Explorer необходимо указать целевые свойства таблицы (имя таблицы, формат данных, сжатие и сопоставление). Маршрутизация по умолчанию для данных также указывается как static routing. Вы также можете указать свойства целевой таблицы для каждого события, используя свойства события. Соединение будет динамически маршрутизировать данные, как указано в EventData.Properties, переопределяя статические свойства для этого события.

В следующем примере задайте сведения о центрах событий и отправьте данные метрик погоды в таблицу WeatherMetrics. Данные имеют формат json. Свойство mapping1 предварительно определено для таблицы WeatherMetrics.

Предупреждение

В этом примере для упрощения используется аутентификация с использованием строки подключения для подключения к Центрам событий. Однако жестко закодированная строка подключения в вашем скрипте требует очень высокой степени доверия к приложению и таит в себе риски безопасности.

Для долгосрочных безопасных решений используйте один из следующих вариантов:

var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;

// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }; 
var data = JsonConvert.SerializeObject(metric);

// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");

// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();

Сопоставление свойств системы событий

Системные свойства хранят свойства, заданные службой Event Hubs в момент размещения события в очереди. Подключение Azure Synapse Data Explorer к Event Hubs внедрит выбранные свойства в данные, попадающие в вашу таблицу.

Примечание.

  • Системные свойства поддерживаются для json и табличных форматов (csv, tsv и т. д.) и не поддерживаются для сжатых данных. При использовании неподдерживаемого формата данные по-прежнему будут приняты, но свойства будут игнорироваться.
  • Для табличных данных системные свойства поддерживаются только для сообщений о событиях с одной записью.
  • Для данных JSON системные свойства также поддерживаются для сообщений о событиях с несколькими записями. В таких случаях системные свойства добавляются только к первой записи сообщения о событии.
  • При сопоставлении в формате csv свойства добавляются в начало записи в порядке, определенном в таблице Системные свойства.
  • При сопоставлении в формате json свойства добавляются в соответствии с именами свойств, указанных в таблице Системные свойства.

Свойства системы

Центры событий предоставляют следующие системные свойства:

Свойство Тип данных Описание
x-opt-enqueued-time (время постановки в очередь) дата и время Время UTC, когда событие было поставлено в очередь
Оптимальный номер последовательности длинный Логический порядковый номер события в потоке секционирования Центров событий
X-opt-offset строка Смещение события из потока секционирования Центров событий. Идентификатор смещения является уникальным в пределах раздела потока Event Hubs
X-opt-publisher строка Имя издателя, если сообщение было отправлено в конечную точку издателя
X-opt-partition-key строка Ключ раздела, в котором хранилось событие

Если вы выбрали Свойства системы событий в разделе Источник данных таблицы, вы должны включить свойства в схему таблицы и сопоставление.

Примеры сопоставления схем

Пример схемы сопоставления структуры таблицы

Если данные содержат три столбца (Timespan, Metric и Value) и включены свойства x-opt-enqueued-time и x-opt-offset, создайте или измените схему таблицы с помощью следующей команды:

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

Пример сопоставления CSV-файла

Выполните следующие команды, чтобы добавить данные в начало записи. Отметьте порядковые значения.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

Пример сопоставления JSON

Данные добавляются с помощью сопоставления свойств системы. Выполните следующие команды.

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Подключение Центров событий

Примечание.

Для обеспечения максимальной производительности создавайте все ресурсы в том же регионе, что и кластер Azure Synapse Data Explorer.

Создание центров событий

Если у вас еще нет, создайте центры событий. Управлять подключением к Центрам событий можно через портал Azure, программно с помощью C# или Python, или с помощью шаблона Azure Resource Manager.

Примечание.

  • Поскольку количество разделов неизменно, вам следует учитывать требования к масштабированию в долгосрочной перспективе при установке количества разделов.
  • Группа потребителей должна быть уникальной для каждого потребителя. Создайте группу потребителей, предназначенную для подключения к Azure Synapse Data Explorer.

Отправка событий

Ознакомьтесь с примером приложения , которое создает данные и отправляет его в Центры событий.

Пример создания примеров данных см. в разделе "Прием данных из Центров событий" в Azure Synapse Data Explorer

Установка решения для геораспределённого аварийного восстановления

Центры обработки событий предлагают решение для геоаварийного восстановления. Azure Synapse Data Explorer не поддерживает Alias пространства имен Центров событий. Чтобы реализовать геоизбыточное восстановление после аварий в вашем решении, создайте два подключения к данным Event Hubs: одно для основного пространства имен и одно для резервного пространства имен. Azure Synapse Data Explorer будет прослушивать оба подключения Центров событий.

Примечание.

Ответственность за реализацию переключения с основного пространства имен на резервное несет пользователь.

Следующие шаги