当Kafka利斯泰纳听多个主题时,你能优先考虑一个主题吗?

q7solyqu  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(451)

当我的@kafkalistener听多个主题时,有人知道如何优先考虑单个Kafka主题吗?
下面是我的代码示例:

@KafkaListener(id = "priority", topics = { "${prio-topic}" }, concurrency = "1", autoStartup = "true")
    @KafkaListener(id = "nonPriority", topics = { "${not-prio-topic-1}",
            "${not-prio-topic-2}", "${not-prio-topic-3}",
            "${not-prio-topic-4}", concurrency = "1", autoStartup = "true")
    public synchronized void  listenManyEntryTopic(String message) {}

我的问题是我想从这个主题开始读 prio-topic 在其他非里约主题之前。只有当我的 prio-topic 是空的我应该开始消费没有任何特定顺序的其他主题。如有任何提示/建议,我们将不胜感激。谢谢你的帮助!

bq3bfh9z

bq3bfh9z1#

kafka中没有区分优先级和非优先级主题消息的功能。为了解决您的问题,解决方案之一是将优先级处理主题与非优先级主题分开,即专用app1只使用优先级主题消息并对其进行处理,而app2同时使用非优先级消息并对其进行处理。

bxjv4tth

bxjv4tth2#

因为您有两个侦听器容器,所以可以添加 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic 作为方法的参数。
然后,当您收到来自优先级主题的消息时,您可以 stop() 另一个侦听器容器(使用 KafkaListenerEndpointRegistry bean),如果它正在运行。
配置 idleEventInterval 在主主题容器中添加 @EventListner 方法 ListenerContainerIdleEvent s(或an) ApplicationListener 豆)。
然后,当检测到空闲的主容器时,可以重新启动非主容器。
编辑

@SpringBootApplication
public class So66366140Application {

    private static final Logger LOG = LoggerFactory.getLogger(So66366140Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So66366140Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so66366140-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66366140-2").partitions(1).replicas(1).build();
    }

    @Autowired
    KafkaListenerEndpointRegistry registry;

    @KafkaListener(id = "so66366140-1", topics = "so66366140-1")
    @KafkaListener(id = "so66366140-2", topics = "so66366140-2", autoStartup = "false")
    public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        LOG.info(in);
        if (topic.equals("so66366140-1")
                && this.registry.getListenerContainer("so66366140-2").isRunning()) {
            LOG.info("Stopping non-pri container");
            this.registry.getListenerContainer("so66366140-2").stop();
        }
    }

    @EventListener
    void events(ListenerContainerIdleEvent event) {
        LOG.info(event.toString());
        if (event.getListenerId().startsWith("so66366140-1")
                && !this.registry.getListenerContainer("so66366140-2").isRunning()) {
            LOG.info("Starting non-pri container");
            this.registry.getListenerContainer("so66366140-2").start();
        }
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0,  10).forEach(i -> {
                template.send("so66366140-1", "foo");
                template.send("so66366140-2", "bar");
                try {
                    Thread.sleep(6_000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5s

您也可以使用暂停/继续而不是停止/启动。

相关问题