Поделиться через


Tutorial: Use the Apache Kafka Producer and Consumer APIs

Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight.

The Kafka Producer API allows applications to send streams of data to the Kafka cluster. The Kafka Consumer API allows applications to read streams of data from the cluster.

В этом руководстве описано, как:

  • Предпосылки
  • Изучение кода
  • Создание и развертывание приложения.
  • Run the application on the cluster

For more information on the APIs, see Apache documentation on the Producer API and Consumer API.

Предпосылки

Изучение кода

Пример приложения расположен https://github.com/Azure-Samples/hdinsight-kafka-java-get-startedв подкаталоге Producer-Consumer . If you're using Enterprise Security Package (ESP) enabled Kafka cluster, you should use the application version located in the DomainJoined-Producer-Consumer subdirectory.

The application consists primarily of four files:

  • pom.xml: этот файл определяет зависимости проекта, версию Java и методы упаковки.
  • Producer.java: This file sends random sentences to Kafka using the producer API.
  • Consumer.java: This file uses the consumer API to read data from Kafka and emit it to STDOUT.
  • AdminClientWrapper.java: This file uses the admin API to create, describe, and delete Kafka topics.
  • Run.java: The command-line interface used to run the producer and consumer code.

Pom.xml

Важные моменты, которые необходимо понять в pom.xml файле:

  • Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the kafka-clients package. Следующий XML-код определяет эту зависимость:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Plugins: Maven plugins provide various capabilities. In this project, the following plugins are used:

    • maven-compiler-plugin: используется для задания версии Java, используемой проектом, равным 8. This is the version of Java used by HDInsight 3.6.
    • maven-shade-plugin: Used to generate an uber jar that contains this application as well as any dependencies. It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Producer.java

The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. The following code snippet is from the Producer.java file from the GitHub repository and shows how to set the producer properties. For Enterprise Security Enabled clusters an additional property must be added "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. The following code snippet from the Consumer.java file sets the consumer properties. For Enterprise Security Enabled clusters an additional property must be added "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

In this code, the consumer is configured to read from the start of the topic (auto.offset.reset is set to earliest.)

Run.java

The Run.java file provides a command-line interface that runs either the producer or consumer code. You must provide the Kafka broker host information as a parameter. You can optionally include a group ID value, which is used by the consumer process. If you create multiple consumer instances using the same group ID, they'll load balance reading from the topic.

Создание и развертывание примера

Use pre-built JAR files

Download the jars from the Kafka Get Started Azure sample. If your cluster is Enterprise Security Package (ESP) enabled, use kafka-producer-consumer-esp.jar. Use the command below to copy the jars to your cluster.

scp kafka-producer-consumer*.jar [email protected]:kafka-producer-consumer.jar

Build the JAR files from code

If you would like to skip this step, prebuilt jars can be downloaded from the Prebuilt-Jars subdirectory. Download the kafka-producer-consumer.jar. If your cluster is Enterprise Security Package (ESP) enabled, use kafka-producer-consumer-esp.jar. Execute step 3 to copy the jar to your HDInsight cluster.

  1. Download and extract the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Set your current directory to the location of the hdinsight-kafka-java-get-started\Producer-Consumer directory. If you are using Enterprise Security Package (ESP) enabled Kafka cluster, you should set the location to DomainJoined-Producer-Consumersubdirectory. Use the following command to build the application:

    mvn clean package
    

    This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar. For ESP clusters the file will be kafka-producer-consumer-esp-1.0-SNAPSHOT.jar

  3. Замените sshuser на пользователя SSH для кластера и замените CLUSTERNAME его именем. Enter the following command to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster. When prompted enter the password for the SSH user.

    scp ./target/kafka-producer-consumer*.jar [email protected]:kafka-producer-consumer.jar
    

Запустите пример

  1. Замените sshuser на пользователя SSH для кластера и замените CLUSTERNAME его именем. Откройте подключение SSH к кластеру, введя следующую команду. При появлении запроса введите пароль для учетной записи пользователя SSH.

    ssh [email protected]
    
  2. To get the Kafka broker hosts, substitute the values for <clustername> and <password> in the following command and execute it. Use the same casing for <clustername> as shown in the Azure portal. Replace <password> with the cluster login password, then execute:

    sudo apt -y install jq
    export CLUSTER_NAME='<clustername>'
    export PASSWORD='<password>'
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Примечание.

    Для этой команды требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните эту команду на компьютере с доступом к Ambari.

  3. Create Kafka topic, myTest, by entering the following command:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  4. To run the producer and write data to the topic, use the following command:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  5. Once the producer has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    scp ./target/kafka-producer-consumer*.jar [email protected]:kafka-producer-consumer.jar
    

    The records read, along with a count of records, is displayed.

  6. Use Ctrl + C to exit the consumer.

Multiple consumers

Kafka consumers use a consumer group when reading records. Using the same group with multiple consumers results in load balanced reads from a topic. Each consumer in the group receives a portion of the records.

The consumer application accepts a parameter that is used as the group ID. For example, the following command starts a consumer using a group ID of myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Use Ctrl + C to exit the consumer.

To see this process in action, use the following command:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

This command uses tmux to split the terminal into two columns. A consumer is started in each column, with the same group ID value. Once the consumers finish reading, notice that each read only a portion of the records. Use Ctrl + C twice to exit tmux.

Consumption by clients within the same group is handled through the partitions for the topic. In this code sample, the test topic created earlier has eight partitions. If you start eight consumers, each consumer reads records from a single partition for the topic.

Это важно

There cannot be more consumer instances in a consumer group than partitions. In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. Or you can have multiple consumer groups, each with no more than eight consumers.

Records stored in Kafka are stored in the order they're received within a partition. To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

Common Issues faced

  1. Topic creation fails If your cluster is Enterprise Security Pack enabled, use the pre-built JAR files for producer and consumer. The ESP jar can be built from the code in the DomainJoined-Producer-Consumer subdirectory. The producer and consumer properties have an additional property CommonClientConfigs.SECURITY_PROTOCOL_CONFIG for ESP enabled clusters.

  2. Failure in ESP enabled clusters: If produce and consume operations fail and you are using an ESP enabled cluster, check that the user kafka is present in all Ranger policies. If it is not present, add it to all Ranger policies.

Очистка ресурсов

Чтобы очистить ресурсы, созданные этим руководством, можно удалить группу ресурсов. При этом будет удален связанный кластер HDInsight и другие ресурсы, связанные с этой группой ресурсов.

Чтобы удалить группу ресурсов с помощью портала Azure, сделайте следующее:

  1. На портале Azure разверните меню слева, чтобы открыть меню служб, а затем выберите Группы ресурсов, чтобы просмотреть список групп ресурсов.
  2. Найдите группу ресурсов, которую нужно удалить, и щелкните правой кнопкой мыши кнопку Дополнительно (…) справа от списка.
  3. Выберите Удалить группу ресурсов и подтвердите выбор.

Дальнейшие действия

In this document, you learned how to use the Apache Kafka Producer and Consumer API with Kafka on HDInsight. Use the following to learn more about working with Kafka: