如何使用@KafkaListner注解订阅多个主题

xesrikrc  于 2022-12-03  发布在  Apache
关注(0)|答案(3)|浏览(435)

我正在使用Kafka消息代理来发布和订阅事件。为此使用Spring基础设施。我的要求是我需要创建一个订阅多个主题的消费者。
下面的代码在订阅单个主题时工作得非常好。

@KafkaListener(topics = "com.customer.nike")
  public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
  }

但是我想,它应该是订阅一些模式的主题。

@KafkaListener(topics = "com.cusotmer.*.nike")
      public void receive(String payload) {
        LOGGER.info("received payload='{}'", payload);
      }

在这段代码中,* 会不断变化。它可能是一些数字值,如1000、1010等等。为此,我也使用了SpeL。

@KafkaListener(topics = "#{com.cusotmer.*.nike}")
      public void receive(String payload) {
        LOGGER.info("received payload='{}'", payload);
      }

但是这个也不适合我。有人能帮我订阅多个主题吗?
先谢谢你。

9lowa7mx

9lowa7mx1#

我使用@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}",其中kafka.topics取自我的属性文件,并包含侦听器应该侦听的逗号分隔的主题。
但也可以是,在启动期间,u可以添加逻辑以生成所有可能的主题,并将其分配给变量,该变量稍后可以如上所述地使用。
更新:通配符是可能的,因为 Alexandria 评论如下。

c3frrgcw

c3frrgcw2#

关于订阅多个主题,您可以使用topicPatterns来实现:
此监听器的主题模式。项目可以是'topic pattern'、'property-placeholder key'或'expression'。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。将针对检查时存在的主题定期执行模式匹配。表达式必须解析为主题模式(支持字符串或模式结果类型)。
与topics()和topicPartitions()互斥。

@KafkaListener(topicPattern = "com.customer.*")
  public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
  }

关于对the topic name的编程访问,您可以使用@Header注解方法来提取由KafkaHeaders定义的特定头值,在您的示例中为RECEIVED_TOPIC:
包含从中接收消息的主题的标头。

@KafkaListener(topics = "com.customer.nike")
    public void receive(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    LOGGER.info("received payload='{}'", payload);
    LOG.info("received from topic: {}", topic);
    }
gorkyyrv

gorkyyrv3#

如果我们必须从application.properties文件中获取多个主题:

@KafkaListener(topics = { "${spring.kafka.topic1}", "${spring.kafka.topic2}" })

相关问题