Ингестация данных из Apache Kafka в Azure Data Explorer

Apache Kafka — это распределенная платформа потоковой передачи для создания конвейеров потоковой передачи в режиме реального времени, которые надежно перемещают данные между системами или приложениями. Kafka Connect — это инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Приемник Kusto Kafka служит соединителем из Kafka и не требует использования кода. Скачайте jar-файл соединителя приемника из репозитория Git или Confluent Connector Hub.

В этой статье показано, как загрузить данные с помощью Kafka, используя автономную установку Docker для упрощения настройки кластера Kafka и кластера подключений Kafka.

Дополнительные сведения см. в разделе Репозиторий Git и Сведения о версиях для соединителя.

Предварительные условия

Создание учетной записи службы Microsoft Entra

Субъект-служба Microsoft Entra можно создать с помощью портал Azure или программы, как показано в следующем примере.

Эта учетная запись службы — это удостоверение, используемое соединителем для записи данных в вашу таблицу в Kusto. Вы предоставляете этому принципалу службы права доступа к ресурсам Kusto.

  1. Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.

    az login
    
  2. Выберите подписку для размещения основного элемента. Этот шаг необходим, если у вас несколько подписок.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Создайте субъект-службу. В этом примере принципал службы называется my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Из возвращаемых данных JSON скопируйте appId, password и tenant для дальнейшего использования.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Вы создали приложение Microsoft Entra и субъект-службу.

Создайте целевую таблицу

  1. В среде запроса создайте таблицу Storms с помощью следующей команды:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Создайте соответствующее сопоставление таблицы Storms_CSV_Mapping для загруженных данных с помощью следующей команды:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Создайте политику пакетной загрузки в таблице для настройки задержки очереди при приеме данных.

    Совет

    Политика пакетной обработки данных — это оптимизатор производительности, включающий три параметра. Первое условие, которое выполнено, запускает внесение данных в таблицу.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Используйте сервисный принципал из Создание сервисного принципала Microsoft Entra, чтобы предоставить разрешение на работу с базой данных.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Запуск лаборатории

Следующая лаборатория предназначена для создания данных, настройки соединителя Kafka и потоковой передачи этих данных. Затем вы можете просмотреть полученные данные.

Клонировать репозиторий git

Склонировать git-репозиторий лаборатории.

  1. Создайте локальный каталог на вашем компьютере.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Клонируйте репозиторий.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Содержание клонированного репозитория

Выполните следующую команду, чтобы вывести список содержимого клонированного репозитория:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Вот результат этого поиска:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Просмотрите файлы в клонированном репозитория

В следующих разделах описываются важные части файлов в дереве файлов.

adx-sink-config.json

Этот файл содержит файл свойств приемника Kusto, в котором обновляются определенные сведения о конфигурации:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Замените значения следующих атрибутов в соответствии с настройкой: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (имя базы данных) kusto.ingestion.urlи kusto.query.url.

Соединитель — Dockerfile

В этом файле есть команды для создания образа докера для экземпляра соединителя. Он включает загрузку коннектора из директории релизов репозитория git.

Каталог производителя событий штормов

В этом каталоге находится программа Go, которая считывает локальный файл StormEvents.csv и публикует данные в теме Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Запуск контейнеров

  1. В терминале запустите контейнеры:

    docker-compose up
    

    Приложение-производитель начинает отправку событий в storm-events раздел. Вы должны увидеть логи, похожие на следующие:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Чтобы проверить журналы, выполните следующую команду в отдельном терминале:

    docker-compose logs -f | grep kusto-connect
    

Запуск соединителя

Используйте REST-вызов Kafka Connect для запуска соединителя.

  1. В отдельном терминале запустите задачу приемника с помощью следующей команды:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Чтобы проверить статус, запустите следующую команду в отдельном терминале:

    curl http://localhost:8083/connectors/storm/status
    

Соединитель начинает ставить в очередь процессы загрузки.

Примечание.

Если у вас возникли проблемы с логическим коннектором, создайте запрос.

Управляемое удостоверение

По умолчанию соединитель Kafka использует метод аутентификации приложения для проверки подлинности во время загрузки данных. Для аутентификации с использованием управляемой идентичности:

  1. Назначьте вашему кластеру управляемое удостоверение и предоставьте вашей учетной записи хранения разрешения на чтение. Дополнительные сведения см. в разделе прием данных с использованием проверки подлинности управляемого удостоверения.

  2. В файле adx-sink-config.json установите aad.auth.strategymanaged_identity и убедитесь, что aad.auth.appid задан как идентификатор клиента управляемого удостоверения (приложения).

  3. Используйте маркер службы метаданных частного экземпляра вместо учетной записи службы Microsoft Entra.

Примечание.

При использовании управляемого удостоверения соединитель автоматически извлекает appId и tenant из контекста вызова, поэтому не нужно предоставлять пароль.

Запросить и просмотреть данные

Подтвердите получение данных

  1. После поступления данных в Storms таблицу подтвердите передачу данных, проверив количество строк:

    Storms 
    | count
    
  2. Убедитесь, что в процессе приема нет сбоев:

    .show ingestion failures
    

    Как только вы увидите данные, попробуйте выполнить несколько запросов.

Запрос данных

  1. Чтобы увидеть все записи, выполните следующий запрос:

    Storms
    | take 10
    
  2. Используйте where и project для фильтрации определенных данных:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Используйте оператор summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Снимок экрана с результатами подключённой диаграммы столбцов запроса Kafka.

Дополнительные примеры запросов и рекомендации см. в руководстве по написанию запросов на KQL и в документации по языку запросов Kusto.

Reset

Для сброса выполните следующие действия.

  1. Остановка контейнеров (docker-compose down -v)
  2. Удалить (drop table Storms)
  3. Повторное создание таблицы Storms
  4. Восстановление сопоставления таблиц
  5. Перезапуск контейнеров (docker-compose up)

Очистка ресурсов

Чтобы удалить ресурсы Azure Data Explorer, используйте az kusto cluster delete (kusto extension) или az kusto database delete (kusto extension):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Вы также можете удалить кластер и базу данных с помощью портал Azure. Дополнительные сведения см. в статье "Удаление кластера Azure Data Explorer" и удаление базы данных в Azure Data Explorer.

Настройка соединителя Kafka Sink

Настройте соединитель Kafka Sink для работы с политикой пакетной обработки приема данных:

  • Настройте лимит размера flush.size.bytes для Kafka Sink начиная с 1 МБ и увеличивая на 10 МБ или 100 МБ.
  • При использовании Kafka Sink данные агрегируются дважды. Данные на стороне соединителя агрегируются в соответствии с параметрами очистки и на стороне службы в соответствии с политикой пакетной обработки. Если время пакетной обработки слишком короткое, поэтому данные не могут быть приняты соединителем и службой, необходимо увеличить время пакетной обработки. Установите размер пакетной обработки в 1 ГБ. При необходимости его можно увеличить или уменьшить с шагом 100 МБ. Например, если размер очистки составляет 1 МБ, а размер политики пакетной обработки составляет 100 МБ, соединитель приемника Kafka объединяет данные в пакет размером 100 МБ. Затем этот пакет обрабатывается службой. Если время политики пакетной обработки составляет 20 секунд, а соединитель Приемника Kafka сбрасывает 50 МБ в 20-секундный период, служба выполняет прием пакета размером 50 МБ.
  • Вы можете изменять масштаб, добавляя экземпляры и разделы Kafka. Увеличьте значение tasks.max до нужного числа разделов. Создайте раздел, если у вас достаточно данных для создания блоба размером, указанным в параметре flush.size.bytes. Если блоб меньше, пакет обрабатывается при достижении предельного времени, поэтому раздел не получает достаточно пропускной способности. Большое количество разделов приводит к увеличению времени на обработку.