多个消费者从一个单一的主题阅读

lsmd5eda  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(322)

我正在使用springboot构建一个web应用程序,现在我需要接收实时通知。我计划使用apachekafka作为消息代理。要求有不同角色的用户,并且根据角色,他们应该接收其他用户正在做什么的通知。
我确实设立了一个生产者和消费者,作为消费者,我可以接收发布到某个主题的信息,比如topic1。
我陷入困境的部分是,我可能有几个用户听同一个主题,每个用户都应该得到发布到该主题的消息。我明白,对于这个要求,我们需要为每个kafkalistener设置不同的group.id,以便每个消费者都能得到消息。
但是,当用户登录时,我将如何创建一个具有不同组id的kafkalistener呢?希望有人能提供一些指导?谢谢您

jyztefdp

jyztefdp1#

只需创建一个新的 KafkaMessageListenerContainer 每次,根据需要启动/停止。
您可以使用引导的自动配置 ConcurrentKafkaListenerContainerFactory 创建容器。只要设定 groupId 容器属性使其唯一。
编辑
举个例子:

@SpringBootApplication
public class So60150686Application {

    public static void main(String[] args) {
        SpringApplication.run(So60150686Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60150686", "foo");
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
    }

}

@RestController
class Web {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    @GetMapping(path="/foo/{group}")
    public String foo(@PathVariable String group) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
        container.getContainerProperties().setGroupId(group);
        container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {

            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record);
            }

        });
        container.start();
        return "ok";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200 
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60

ok

2020-02-10 14:42:09.744信息34096---[consumer-0-c-1]o.s.k.l.kafkamessagelistener容器:条:分配的分区:[so60150686-0]
consumerrecord(主题=so60150686,分区=0,leaderepoch=0,偏移量=1,创建时间=1581363648938,序列化键大小=-1,序列化值大小=3,headers=recordheaders(headers=[],isreadonly=false),键=null,值=foo)

相关问题