Руководство: Потоковая передача и обработка событий в интеллект в режиме реального времени с помощью конечной точки Apache Kafka в потоке событий.

В этом руководстве вы узнаете, как использовать конечную точку Apache Kafka, предоставленную пользовательским источником конечных точек в расширенных возможностях потоков событий Microsoft Fabric, для передачи событий в аналитику в режиме реального времени. (Пользовательская конечная точка называется пользовательским приложением в стандартных функциях потоков событий Fabric.) Вы также узнаете, как обрабатывать события потоковой передачи с помощью конечной точки Apache Kafka через индивидуальную конечную точку назначения потока событий.

Изучив это руководство, вы:

  • Создайте поток событий.
  • Получите конечную точку Kafka из пользовательского источника конечной точки.
  • Отправка событий с помощью приложения Kafka.
  • Получите конечную точку Kafka от пользовательской конечной точки.
  • Потребление событий с помощью приложения Kafka.

Предпосылки

Создание потока событий в Microsoft Fabric

  1. Перейдите на портал Fabric.

  2. Выберите "Моя рабочая область " на левой панели навигации.

  3. На странице Моя рабочая область нажмите на + Новый элемент на панели команд.

  4. На странице нового элемента найдите Eventstream, а затем выберите Eventstream.

    снимок экрана, на котором показана страница нового элемента с выбранным потоком событий.

  5. В окне New Eventstream введите имя для потока событий, а затем выберите Создать.

    Снимок экрана с окном «Новый поток событий».

  6. Создание нового потока событий в рабочей области может занять несколько секунд. После создания потока событий вы будете перенаправлены в главный редактор, где можно начать с добавления источников в поток событий.

    Снимок экрана: редактор.

Получение конечной точки Kafka из добавленного пользовательского источника конечной точки

Чтобы получить конечную точку темы Kafka, добавьте пользовательский источник конечной точки в ваш поток событий. Затем конечная точка подключения Kafka доступна и предоставляется в пользовательском источнике конечной точки.

Чтобы добавить в поток событий пользовательский источник конечной точки, выполните приведенные ниже действия.

  1. На домашней странице потока событий выберите "Использовать пользовательскую конечную точку ", если это пустой поток событий.

    Снимок экрана, демонстрирующий выбор опции использования пользовательской конечной точки.

    Или на ленте выберите Добавить источник>Пользовательская конечная точка.

    Снимок экрана: выбор пользовательской конечной точки в качестве источника для потока событий.

  2. Введите значение имени источника для настраиваемой конечной точки и нажмите кнопку "Добавить".

    Снимок экрана: ввод имени пользовательской конечной точки.

  3. Убедитесь, что пользовательская конечная точка отображается на полотне потока событий в режиме редактирования, а затем выберите Опубликовать.

    Снимок экрана: добавленная пользовательская конечная точка в режиме редактирования.

  4. После успешной публикации потока событий можно получить сведения, включая сведения о конечной точке Kafka. Выберите элемент пользовательской конечной точки на рабочем полотне. Затем в нижней панели узла источника пользовательской конечной точки выберите вкладку Kafka.

    На странице проверки подлинности ключа SAS вы можете получить следующие важные сведения о конечной точке Kafka:

    • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
    • security.protocol=SASL_SSL
    • sasl.mechanism=PLAIN
    • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.CONNECTION.STRING}";

    {YOUR.BOOTSTRAP.SERVER} — это значение сервер Bootstrap на странице аутентификации ключей SAS. {YOUR.CONNECTION.STRING}может быть значением строки подключения — первичным ключом или значением строки подключения — вторичным ключом. Выберите один из них.

    Снимок экрана: ключи Kafka и пример кода.

    Дополнительные сведения о проверке подлинности ключа SAS и примерах кодов см. в разделе "Сведения о конечной точке Kafka".

Отправка событий с помощью приложения Kafka

Используя важные сведения Kafka, полученные на предыдущем шаге, можно заменить конфигурации подключений в существующем приложении Kafka. Затем вы можете отправить события в поток событий.

Ниже приведено одно приложение на основе пакета SDK Центры событий Azure, написанного на Java, следуя протоколу Kafka. Чтобы использовать это приложение для потоковой передачи событий, выполните следующие действия, чтобы заменить сведения о конечной точке Kafka и выполнить это правильно:

  1. Клонируйте репозиторий Центров событий Azure для Kafka.

  2. Перейдите в azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Обновите сведения о конфигурации производителя в src/main/resources/producer.config следующим образом:

    • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
    • security.protocol=SASL_SSL
    • sasl.mechanism=PLAIN
    • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.CONNECTION.STRING}";

    Замените {YOUR.BOOTSTRAP.SERVER} значением сервера Bootstrap. Замените {YOUR.CONNECTION.STRING} на значение строки подключения - первичного ключа или на значение строки подключения - вторичного ключа. Выберите один из них.

  4. Обновите имя раздела на новое имя в src/main/java/TestProducer.java следующим образом: private final static String TOPIC = "{YOUR.TOPIC.NAME}";.

    Значение можно найти на странице проверки подлинности ключа SAS во вкладке Kafka.

  5. Запустите код производителя и выполните передачу событий в поток событийного стрима.

    • mvn clean package
    • mvn exec:java -Dexec.mainClass="TestProducer"

    Снимок экрана: код производителя.

  6. Предварительный просмотр данных, отправленных с помощью этого приложения Kafka. Выберите узел eventstream, который является центральным узлом, отображающим имя вашего потока событий.

    Выберите формат данных CSV с запятой в качестве разделителя, без заголовка. Этот выбор соответствует формату, в котором приложение передает данные события.

    Снимок экрана: предварительный просмотр данных Kafka.

Получение конечной точки Kafka из добавленного назначения пользовательской конечной точки

Можно добавить конечную точку назначения, чтобы получить информацию о конечной точке подключения Kafka для обработки событий из потока событий. После добавления назначения вы можете получить информацию из панели Сведения о назначении в режиме реального времени.

На странице «Базовый» можно получить значение группы потребителей. Это значение необходимо, чтобы позже настроить приложение потребителя Kafka.

На странице проверки подлинности ключа SAS вы можете получить важные сведения о конечной точке Kafka:

  • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
  • security.protocol=SASL_SSL
  • sasl.mechanism=PLAIN
  • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.CONNECTION.STRING}";

{YOUR.BOOTSTRAP.SERVER} — это значение сервера Bootstrap. {YOUR.CONNECTION.STRING} может быть значением строки подключения — первичного ключа или значением строки подключения — вторичного ключа. Выберите один из них.

Потребление событий с приложением Kafka

Теперь вы можете использовать другое приложение в репозитории Azure Event Hubs для Kafka чтобы получать события из вашего потока событий. Чтобы использовать это приложение для использования событий из потока событий, выполните следующие действия, чтобы заменить сведения о конечной точке Kafka и запустить его соответствующим образом:

  1. Клонируйте репозиторий Центров событий Azure для Kafka.

  2. Перейдите к azure-event-hubs-for-kafka/quickstart/java/consumer.

  3. Обновите сведения о конфигурации для потребителя в src/main/resources/consumer.config следующим образом:

    • bootstrap.servers={YOUR.BOOTSTRAP.SERVER}
    • group.id={YOUR.EVENTHUBS.CONSUMER.GROUP}
    • security.protocol=SASL_SSL
    • sasl.mechanism=PLAIN
    • sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    • password="{YOUR.CONNECTION.STRING}";

    Замените {YOUR.BOOTSTRAP.SERVER} значением сервера Bootstrap. Значение {YOUR.EVENTHUBS.CONSUMER.GROUP} можно получить на странице "Базовый" на панели "Сведения" для назначения пользовательской конечной точки. Замените {YOUR.CONNECTION.STRING} значением строки подключения-первичного ключа или значением строки подключения-вторичного ключа. Выберите один из них.

  4. Обновите имя раздела с новым именем раздела на странице проверки подлинности ключей SAS в src/main/java/TestConsumer.java следующим образом: private final static String TOPIC = "{YOUR.TOPIC.NAME}";

    Значение можно найти {YOUR.TOPIC.NAME} на странице Проверка подлинности ключа SAS во вкладке Kafka.

  5. Запустите код потребителя и передавайте события в поток событий.

    • mvn clean package
    • mvn exec:java -Dexec.mainClass="TestConsumer"

Если в вашем потоке событий есть входящие события (например, ваше предыдущее приложение производителя все еще работает), убедитесь, что потребитель теперь получает события из темы вашего потока событий.

Снимок экрана: входящие события Kafka.

По умолчанию потребители Kafka считывают с конца потока, а не в начале. Потребитель Kafka не считывает никаких событий, которые помещаются в очередь перед началом запуска потребителя. Если вы запускаете своего потребителя, но он не получает никаких событий, попробуйте снова запустить продюсера, пока ваш потребитель выполняет опрос.

Заключение

Поздравляем. Вы узнали, как использовать конечную точку Kafka, предоставленную из потока событий, для потоковой передачи и использования событий в потоке событий. Если у вас уже есть приложение, которое отправляет данные в топик Kafka или получает их, вы можете использовать то же приложение для отправки или получения событий в вашем потоке событий без изменений в коде. Просто измените сведения о конфигурации подключения.