Поделиться через


Транзакции в Apache Kafka для Azure Event Hubs

В этой статье содержатся сведения об использовании API транзакций Apache Kafka с Центры событий Azure.

Обзор

Центры событий предоставляют конечную точку Kafka, которая может использоваться существующими клиентскими приложениями Kafka в качестве альтернативы запуску собственного кластера Kafka. Центры событий работают со многими из ваших существующих приложений Kafka. Дополнительные сведения см. в разделе Центры событий для Apache Kafka.

В этом документе основное внимание уделяется бесшовному использованию транзакционного API Kafka с Центром событий Azure.

Примечание.

Транзакции Kafka в настоящее время находятся в общедоступной предварительной версии на уровнях "Премиум" и "Выделенный".

Транзакции в Apache Kafka

В облачных средах приложения должны быть устойчивы к сбоям сети и перезапускам пространства имен и обновлениям. Приложения, требующие строгих гарантий обработки, должны использовать транзакционную платформу или API, чтобы обеспечить выполнение всех операций либо ни одной, таким образом, чтобы состояние приложения и данных надежно управлялось. Если набор операций претерпевает сбой, они могут быть надежно повторно выполнены атомарно, чтобы обеспечить надлежащие гарантии обработки.

Примечание.

Обычно при наличии нескольких операций, которые должны обрабатываться в режиме "все или ничего", обычно требуются гарантии транзакций.

Для всех других операций клиентские приложения по умолчанию устойчивы к повторению операции с экспоненциальным обратным выходом, если конкретная операция завершилась ошибкой.

Apache Kafka предоставляет транзакционный API, чтобы обеспечить такой уровень гарантий обработки как в одном, так и в разных наборах тем/разделов.

Транзакции применяются к следующим случаям:

  • Производители транзакций.
  • Точно после обработки семантики.

Производители транзакций

Транзакционные производители гарантируют, что данные атомарно записываются в несколько разделов в разных темах. Производители могут инициировать транзакцию, записывать в несколько секций в одном разделе или в разных разделах, а затем фиксировать или прерывать транзакцию.

Чтобы убедиться, что производитель транзакционен, enable.idempotence следует установить в true, чтобы данные записывались единожды, избегая дублирования на этапе отправки. Кроме того, transaction.id необходимо задать для уникальной идентификации производителя.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

После инициализации производителя следующий вызов гарантирует, что производитель регистрируется в брокере в качестве производителя транзакций -

    producer.initTransactions();

Затем производитель должен явно начать транзакцию, выполнять операции отправки в разных разделах и секциях как обычные, а затем зафиксировать транзакцию с помощью следующего вызова:

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Если транзакция должна быть прервана из-за сбоя или истечения времени ожидания, производитель может вызвать abortTransaction() метод.

	producer.abortTransaction();

Принцип семантики "точно один раз"

Семантика «точно один раз» основывается на механизме транзакционных производителей, добавляя потребителей в транзакционную область, чтобы каждая запись гарантированно считывалась, обрабатывалась и записывалась ровно один раз.

Сначала создается экземпляр производителя транзакций.


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Затем потребитель должен быть настроен для чтения только нетрансакционных сообщений или фиксации транзакционных сообщений, задав следующее свойство:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

После создания экземпляра потребителя он может подписаться на раздел, из которого должны быть прочитаны записи.


    consumer.subscribe(singleton("inputTopic"));

После того как потребитель опросил записи из входного топика, производитель начинает транзакционную область, в которой запись обрабатывается и записывается в выходной топик. После записи записей создается обновленная карта смещения для всех секций. Затем производитель отправляет обновленную карту смещения для транзакции перед фиксацией транзакции.

В любом исключении транзакция прерывается, и производитель повторяет обработку снова атомарно.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Предупреждение

Если транзакция не зафиксирована или прервана до max.transaction.timeout.ms, то она автоматически прерывается службой Event Hubs. По умолчанию max.transaction.timeout.ms задано значение 15 минут центрами событий, но производитель может переопределить его на меньшее значение, задав transaction.timeout.ms свойство в свойствах конфигурации производителя.

Руководство по миграции

Если у вас есть существующие приложения Kafka, которые вы хотите использовать с Центрами событий Azure, ознакомьтесь с руководством по миграции Kafka для Центров событий Azure , чтобы быстро запустить работу.

Следующие шаги

Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в следующих статьях: