我有一个使用@kafkalisterner注解的带有kafka消费者的spring应用程序。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的场景。以编程方式实现这一点的最佳方法是什么?我们不控制Kafka主题的配置。
l0oc07j21#
@KafkaListener(...) public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) { System.out.println(in); if (this.resetNeeded) { consumer.seekToBeginning(consumer.assignment()); this.resetNeeded = false; } }
如果要在侦听器空闲(无记录)时重置,可以启用空闲事件并通过侦听 ListenerContainerIdleEvent 在一个 ApplicationListener 或者 @EventListener 方法。事件引用了消费者。编辑
ListenerContainerIdleEvent
ApplicationListener
@EventListener
@SpringBootApplication public class So58769796Application { public static void main(String[] args) { SpringApplication.run(So58769796Application.class, args); } @KafkaListener(id = "so58769796", topics = "so58769796") public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { System.out.println("One:" + key + ":" + value); } @KafkaListener(id = "so58769796a", topics = "so58769796") public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { System.out.println("Two:" + key + ":" + value); } @Bean public NewTopic topic() { return TopicBuilder.name("so58769796") .compact() .partitions(1) .replicas(1) .build(); } boolean reset; @Bean public ApplicationRunner runner(KafkaTemplate<String, String> template) { return args -> { template.send("so58769796", "foo", "bar"); System.out.println("Hit enter to rewind"); System.in.read(); this.reset = true; }; } @EventListener public void listen(ListenerContainerIdleEvent event) { System.out.println(event); if (this.reset && event.getListenerId().startsWith("so58769796-")) { event.getConsumer().seekToBeginning(event.getConsumer().assignment()); } } }
和
spring.kafka.listener.idle-event-interval=5000
编辑2这里有另一种技术-在这种情况下,我们在每次应用程序启动(和按需)时倒带。。。
@SpringBootApplication public class So58769796Application implements ConsumerSeekAware { public static void main(String[] args) { SpringApplication.run(So58769796Application.class, args); } @KafkaListener(id = "so58769796", topics = "so58769796") public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { System.out.println(key + ":" + value); } @Bean public NewTopic topic() { return TopicBuilder.name("so58769796") .compact() .partitions(1) .replicas(1) .build(); } @Bean public ApplicationRunner runner(KafkaTemplate<String, String> template, KafkaListenerEndpointRegistry registry) { return args -> { template.send("so58769796", "foo", "bar"); System.out.println("Hit enter to rewind"); System.in.read(); registry.getListenerContainer("so58769796").stop(); registry.getListenerContainer("so58769796").start(); }; } @Override public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) { assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition())); } }
1条答案
按热度按时间l0oc07j21#
如果要在侦听器空闲(无记录)时重置,可以启用空闲事件并通过侦听
ListenerContainerIdleEvent
在一个ApplicationListener
或者@EventListener
方法。事件引用了消费者。
编辑
和
编辑2
这里有另一种技术-在这种情况下,我们在每次应用程序启动(和按需)时倒带。。。