为什么commitasync不能提交前2个偏移量

bwntbbo3  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(371)

我遇到了一个奇怪的问题,消费者无法将comitasync作为日志的前2个偏移量,我不知道原因。这是非常奇怪的,因为其他消息在同一异步发送的生产者收到并提交成功的消费者。有人能找到这个问题的根源。。我引用下面的代码和一个输出示例

package com.panos.example;

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class Consumer extends ShutdownableThread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;

    public Consumer(String topic) {
        super("KafkaConsumerExample", false);
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.75:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<Integer, String>(props);
        this.topic = topic;
    }

    @Override
    public void doWork() {

        consumer.subscribe(Collections.singletonList(this.topic));
        try {
            ConsumerRecords<Integer, String> records = consumer.poll(1000);
            long startTime = System.currentTimeMillis();
            if (!records.isEmpty()) {
                System.out.println("C : {} Total No. of records received : {}" + records.count());

                for (ConsumerRecord<Integer, String> record : records) {
                    System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
                    consumer.commitAsync(new ConsumerCallBack(startTime,record.value(), record.offset()));
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public String name() {
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }

    class ConsumerCallBack implements OffsetCommitCallback {

        private final long startTime;
        private String message;
        private final String NewLine = System.getProperty("line.separator");
        private long offset;

        public ConsumerCallBack(long startTime) {
            this.startTime = startTime;
        }

        public ConsumerCallBack(long startTime, String message, long offset) {
            this.startTime = startTime;
            this.message=message;
            this.offset = offset;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> CurrentOffset,
                               Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (exception != null) {
                System.out.println("Message : {" + message + "}, committed successfully at offset " + offset +
                        CurrentOffset + "elapsed time :" + elapsedTime);
            } else {
                System.out.println(exception.toString());
               /* JOptionPane.showMessageDialog(new Frame(),
                        "Something Goes Wrong with the Server Please Try again Later.",
                        "Inane error",
                        JOptionPane.ERROR_MESSAGE);*/
            }
        }
    }
}

如您所见,除了前2条消息外,所有消息都已成功提交,没有任何异常。为什么会这样?

Received message: (1, Message_1) at offset 160
Received message: (2, Message_2) at offset 161
Received message: (3, Message_3) at offset 162
Received message: (4, Message_4) at offset 163
Message : {Message_3}, committed successfully at offset 162{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6
Message : {Message_4}, committed successfully at offset 163{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6
46scxncf

46scxncf1#

如果你使用 commitAsync 多个提交可能被压缩到一个提交消息中。随着偏移量的提交顺序的增加,偏移量x的提交是所有小于x的偏移量的隐式提交。在您的例子中,似乎提交或前三个偏移量是在偏移量3的单个提交中完成的。

相关问题