spring kafka:根据文档使用暂停/恢复方法暂停/恢复消费者时,如果使用了自动分配但它不起作用,则不应发生重新平衡。如何暂停/恢复消费者并在一段时间后保持轮询而不重新平衡?
用例:消费者应该暂停一段时间并保持轮询,以便在时间结束后给出心跳和恢复,但是Kafka不应该在消费者暂停时重新平衡。
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
consumer.pause(Collections.singleton(topicPartition));
try {
Thread.sleep(60000);
consumer.resume(Collections.singleton(topicPartition));
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
} catch (InterruptedException e) {
e.printStackTrace();
}
日志:2019-02-19 15:19:49.173 info 82272---[rtaskexecutor-1]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-2,groupid=customer](重新)加入组2019-02-19 15:19:49.175 info 82272---[rtaskexecutor-2]o.a.k.c.internals.abstractcoordinator:[consumer clientid=consumer-3,groupid=customer](重新)加入组2019-02-19 15:19:49.181信息82272---[rtaskexecutor-3]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-4,groupid=customer](重新)加入组
2019-02-19 15:19:49.192 info 82272---[rtaskexecutor-1]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-2,groupid=customer]成功加入组,生成581 2019-02-19 15:19:49.192 info 82272---[rtaskexecutor-2]o.a.k.c.internals.abstractcoordinator:[consumer clientid=consumer-3,groupid=customer]已成功加入第581代的组
2019-02-19 15:19:49.194信息82272---[rtaskexecutor-1]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-2,groupid=customer]设置新分配的分区[spring-kafka-topic-2,spring-kafka-topic-0,spring-kafka-topic-1]2019-02-19 15:19:49.194信息82272---[rtaskexecutor-2]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-3,groupid=customer]设置新分配的分区[spring-kafka-topic-4,spring-kafka-topic-5,spring-kafka-topic-3]2019-02-19 15:19:49.218信息82272---[rtaskexecutor-2]o.s.k.l.kafka消息容器:分配的分区:[spring-kafka-topic-4,spring-kafka-topic-5,spring-kafka-topic-3]2019-02-19 15:19:49.219信息82272---[rtaskexecutor-1]o.s.k.l.kafka消息容器:分配的分区:[spring-kafka-topic-2,spring-kafka-topic-0,spring-kafka-topic-1]2019-02-19 15:19:49.223 info 82272---[main]o.s.b.w.embedded.tomcat.tomcatwebserver:tomcat在端口8080(http)上启动,上下文路径为“”2019-02-19 15:19:49.233 info 82272---[main]c.g.s.springkafkasupportapplication:springkafkasupportapplication在3.43秒内启动(jvm运行3.85)消费者[customertaskexecutor-1]收到消息[customer(name=,phonenumber=20)]消费者[customertaskexecutor-2]收到消息[customer(name=test 6,phonenumber=6)]消费者[customertaskexecutor-1]分区[spring-kafka-topic-2]停止消费。使用者[customertaskexecutor-1]分区[spring-kafka-topic-1]已停止消费。2019-02-19 15:19:52.200信息82272---[rtaskexecutor-2]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-3,groupid=customer]尝试心跳失败,因为组正在重新平衡2019-02-19 15:19:52.200信息82272---[rtaskexecutor-1]o.a.k.c.internals.abstractcoordinator:[consumer clientid=consumer-2,groupid=customer]尝试检测心跳失败,因为组正在重新平衡2019-02-19 15:19:52.200 info 82272---[rtaskexecutor-1]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-2,groupid=customer]取消以前分配的分区[spring-kafka-topic-2,spring-kafka-topic-0,spring-kafka-topic-1]2019-02-19 15:19:52.200信息82272---[rtaskexecutor-2]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-3,groupid=customer]撤销先前分配的分区[spring-kafka-topic-4,spring-kafka-topic-5,spring-kafka-topic-3]2019-02-19 15:19:52.200信息82272---[rtaskexecutor-1]o.s.k.l.kafka消息容器:已撤销分区:[spring-kafka-topic-2,spring-kafka-topic-0,spring-kafka-topic-1]2019-02-19 15:19:52.200信息82272---[rtaskexecutor-2]o.s.k.l.kafka消息容器:已撤销分区:[spring-kafka-topic-4,spring-kafka-topic-5,spring-kafka-topic-3]2019-02-19 15:19:52.200信息82272---[rtaskexecutor-1]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-2,groupid=customer](重新)加入组2019-02-19 15:19:52.200信息82272---[rtaskexecutor-2]o.a.k.c.internals.abstractcoordinator:[consumer clientid=consumer-3,groupid=customer](重新)加入组2019-02-19 15:19:52.209信息82272---[rtaskexecutor-1]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-2,groupid=customer]已成功加入第582代组2019-02-19 15:19:52.209 info 82272---[rtaskexecutor-2]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-3,groupid=customer]已成功加入第582代的组2019-02-19 15:19:52.209 info 82272---[rtaskexecutor-3]o.a.k.c.c.internals.abstractcoordinator:[consumer clientid=consumer-4,groupid=customer]成功加入组,生成582 2019-02-19 15:19:52.209信息82272---[rtaskexecutor-3]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-4,groupid=customer]设置新分配的分区[spring-kafka-topic-4,spring-kafka-topic-5]2019-02-19 15:19:52.210信息82272---[rtaskexecutor-1]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-2,groupid=customer]设置新分配的分区[spring-kafka-topic-0,spring-kafka-topic-1]2019-02-19 15:19:52.210信息82272---[rtaskexecutor-2]o.a.k.c.c.internals.consumercoordinator:[consumer clientid=consumer-3,groupid=customer]设置新分配的分区[spring-kafka-topic-2,spring-kafka-topic-3]2019-02-19 15:19:52.211信息82272---[rtaskexecutor-3]o.s.k.l.kafka消息容器:分配的分区:[spring-kafka-topic-4,spring-kafka-topic-5]2019-02-19 15:19:52.212信息82272---[rtaskexecutor-1]o.s.k.l.kafka消息容器:分配的分区:[spring-kafka-topic-0,spring-kafka-topic-1]2019-02-19 15:19:52.212信息82272---[rtaskexecutor-2]o.s.k.l.kafkamessagelistenercontainer:分配的分区:[spring-kafka-topic-2,spring-kafka-topic-3]消费者[customertaskexecutor-3]收到的消息[customer(name=test 6,phonenumber=6)]
2条答案
按热度按时间fquxozlt1#
只是与消费者和SpringKafka有相同的“群”信息。与@kafkalistener和带有concurrentmessagelistenercontainer的无注解spring的结果相同。参数调整的工作原理与纯java不完全相同。
使用consumer.poll()用straight java重新编写,并按照gary russell使用executorservice调整的参数启动线程,一切正常。在重新平衡过程中,不再获得这些消息并丢失心跳。来自clouderable和confluent网站的直接java示例:
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
https://docs.confluent.io/current/clients/consumer.html#
6rqinv9w2#
阅读Kafka文献。
暂停消费者仅仅意味着
poll()
在你打电话之前,s不会返回任何记录resume()
,但你还是要打电话poll()
内max.poll.interval.ms
为了防止再平衡。