Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве описано, как подключить приложение Spark к Центрам событий для потоковой передачи в режиме реального времени. Такая интеграция обеспечивает потоковую передачу без необходимости изменять клиенты протокола или запускать собственные кластеры Kafka или Zookeeper. Для работы с этим руководством требуется Apache Spark v2.4+ и Apache Kafka v2.0+.
Примечание.
Этот пример можно найти на сайте GitHub.
В этом руководстве описано, как:
- Создание пространства имен Центров событий
- Клонирование примера проекта
- Запуск Spark
- Чтение из Центров событий Kafka
- Запись данных в Event Hubs для Kafka
Предпосылки
Прежде чем приступить к работе с этим руководством, убедитесь, что у вас есть:
- Подписка Azure. Если ее нет, создайте бесплатную учетную запись.
- Apache Spark версии 2.4
- Apache Kafka версии 2.0
- Git
Примечание.
Адаптер Spark-Kafka был обновлен для поддержки Kafka версии 2.0 по состоянию на Spark версии 2.4. В предыдущих выпусках Spark адаптер поддерживал Kafka версии 0.10 и более поздних версий, но использовался специально для API Kafka версии 0.10. Так как Центры событий для Kafka не поддерживают Kafka версии 0.10, адаптеры Spark-Kafka из версий Spark до версии 2.4 не поддерживаются центрами событий для экосистем Kafka.
Создание пространства имен Центров событий
Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Event Hubs и полностью квалифицированное доменное имя (FQDN) для последующего использования. Для получения инструкций см. Get an Event Hubs connection string (Получение строки подключения для Центров событий).
Клонирование примера проекта
Клонируйте репозиторий Azure Event Hubs и перейдите к подкаталогу tutorials/spark
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Чтение из Средств событий для Kafka
При нескольких изменениях конфигурации можно начать чтение из Центров событий для Kafka. Обновите BOOTSTRAP_SERVERS и EH_SASL с подробными сведениями из пространства имен, и вы можете начать потоковую передачу с центрами событий, как и в Kafka. Полный пример кода см. в файле sparkConsumer.scala на сайте GitHub.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Если вы получаете ошибку, аналогичную следующей ошибке, добавьте .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
к вызову spark.readStream
и повторите попытку.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Запись в Центры событий для Kafka
Вы также можете писать в Event Hubs так же, как и в Kafka. Не забудьте обновить конфигурацию, чтобы заменить BOOTSTRAP_SERVERS и EH_SASL на информацию из пространства имен Event Hubs. Полный пример кода см. в файле sparkProducer.scala на сайте GitHub.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Дальнейшие действия
Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в следующих статьях: