Kafka:时断时续的慢,当消费的第一条消息从主题

lnlaulya  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(542)

我用的是Kafka0.9.0.1。
第一次启动应用程序时,从主题中检索“最新”消息需要20-30秒
我使用了不同的kafka代理(配置不同),但我仍然看到这种行为。后续消息通常没有延迟。
这是预期的行为吗?通过运行这个示例应用程序并将代理/主题名称更改为您自己的设置,您可以在下面清楚地看到这一点

public class KafkaProducerConsumerTest {

    public static final String KAFKA_BROKERS = "...";
    public static final String TOPIC = "...";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new KafkaProducerConsumerTest().run();
    }

    public void run() throws ExecutionException, InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
        consumerProperties.setProperty("group.id", "Test");
        consumerProperties.setProperty("auto.offset.reset", "latest");
        consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC);
        Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume());

        Properties producerProperties = new Properties();
        producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
        producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC);
        kafkaProducer.publish("Test Message");
    }
}

class MyKafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
    private KafkaConsumer<String, Object> kafkaConsumer;

    public MyKafkaConsumer(Properties properties, String topic) {
        kafkaConsumer = new KafkaConsumer<String, Object>(properties);
        kafkaConsumer.subscribe(Lists.newArrayList(topic));
    }

    public void consume() {
        while (true) {
            logger.info("Started listening...");
            ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE);
            logger.info("Received records {}", consumerRecords.iterator().next().value());
        }
    }
}

class MyKafkaProducer {
    private KafkaProducer<String, Object> kafkaProducer;
    private String topic;

    public MyKafkaProducer(Properties properties, String topic) {
        this.kafkaProducer = new KafkaProducer<String, Object>(properties);
        this.topic = topic;
    }

    public void publish(Object object) throws ExecutionException, InterruptedException {
        ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object);
        Future<RecordMetadata> response = kafkaProducer.send(producerRecord);
        response.get();
    }

}
tjvv9vkg

tjvv9vkg1#

第一条消息的时间应该比其他消息长,因为在语句指定的使用者组中启动新使用者时 consumerProperties.setProperty("group.id", "Test"); ,kakfka将平衡分区,使每个分区由一个使用者使用,并将主题的分区分布在多个使用者进程中。
此外,Kafka0.9,有一个单独的 __consumer_offsets Kafka用来管理消费群体中每个消费者的补偿的主题。很可能,当您第一次启动使用者时,它会查看此主题以获取最新的偏移量(先前可能有一个使用者正在使用此主题中的偏移量,而该偏移量会被杀死,因此有必要从正确的偏移量获取)。
这两个因素将导致第一组消息的使用延迟更高。我不能评论20-30秒的确切延迟,但我猜这应该是默认行为。
ps:确切的数字还可能取决于其他次要因素,比如您是在同一台机器(没有网络延迟)上运行代理和消费者,还是在使用tcp进行通信的不同机器上运行。

ie3xauqp

ie3xauqp2#

根据这个链接:
尝试设置 group_id=None 或者在结束脚本之前调用consumer.close(),或者使用assign()not subscribe()。否则,您将重新加入现有的组,该组具有已知但无响应的成员。组协调员将等待这些成员签入/离开/超时。由于使用者不再存在(这是您以前的脚本运行),它们必须超时。和consumer.poll()在组重新平衡期间阻塞。
因此,如果您加入一个成员无响应的组(可能您不正常地终止应用程序),这是正确的行为。
请确认您在退出应用程序之前调用“consumer.close()”。

kgsdhlau

kgsdhlau3#

只是尝试了你的代码与最小的日志添加现在很多次。下面是一个典型的日志输出:

2016-07-24 15:12:51,417 Start polling...|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,604 producer has send message|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,619 producer got response, exiting|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,679 Received records [Test Message]|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,679 Start polling...|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:54,680 returning on empty poll result|INFO|KafkaProducerConsumerTest

事件的顺序如预期的那样及时。消费者开始轮询,生产者发送消息并接收结果,消费者以300ms的时间接收消息和所有这些。然后消费者再次开始轮询,并在3秒后抛出,因为我分别更改了轮询超时。
我将kafka0.9.0.1用于代理和客户机库。连接在localhost上,是一个完全没有负载的测试环境。
为了完整起见,下面是由上面的exchange触发的服务器的日志表单。

[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Stabilized group Test generation 1 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:12:51,617] INFO [GroupCoordinator 0]: Assignment received from leader for group Test for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:13:24,635] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:13:24,637] INFO [GroupCoordinator 0]: Group Test generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)

您可能希望与同一个exchange的服务器日志进行比较。

相关问题