Spring Kafka,我可以在同一个类(bean)中有2个具有相同主题的消息消费者吗?

flvlnr44  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(149)

我正在实施社会媒体平台的学习目的。所以基本上我有帖子和评论实体。我想使用相同的主题KafkaTopic.POST,但不同的分区KafkaPartition.POST, KafkaPartition.COMMENT。我想知道当在同一个bean中定义它时,它是否能够同时处理2个消息POSTCOMMENT

@Component
public class PostListener {

    private static final Logger logger = LoggerFactory.getLogger(PostListener.class);
    private final PostService postService;

    private final CommentService commentService;

    public PostListener(PostService postService, CommentService commentService) {
        this.postService = postService;
        this.commentService = commentService;
    }

    @KafkaListener(topics = KafkaTopic.POST,
                     groupId = KafkaGroup.POST_GROUP,
                     topicPartitions = { @TopicPartition(topic = KafkaTopic.POST, partitions = { KafkaPartition.POST })})
    public void on(@Payload PostPayload payload,
                   @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {

        postService.createPost(payload.getSenderId(), payload.getContent(), timestamp)
                   .subscribe();
    }

    @KafkaListener(topics = KafkaTopic.POST,
                     groupId = KafkaGroup.POST_GROUP,
                     topicPartitions = { @TopicPartition(topic = KafkaTopic.POST, partitions = { KafkaPartition.COMMENT })})
    public void on(@Payload CommentPayload payload,
                   @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {

        commentService.createComment(payload.getPostId(), payload.getSenderId(), payload.getContent(), timestamp)
                      .subscribe();
    }
}
6pp0gazn

6pp0gazn1#

是的,您可以使用Spring Kafka中的Kafka侦听器处理来自同一主题的不同分区的消息。在本例中,您定义了两个Kafka侦听器,每个侦听器处理来自同一主题(KafkaTopic.POST)的不同分区的消息。
@KafkaListener annotation允许您指定要从中消费消息的主题和分区。由于您使用不同的分区来发布和评论(KafkaPartition.POST和KafkaPartition.COMMENT),因此您的Kafka侦听器将能够同时处理来自这些分区的消息。

相关问题