我正在实施社会媒体平台的学习目的。所以基本上我有帖子和评论实体。我想使用相同的主题KafkaTopic.POST
,但不同的分区KafkaPartition.POST, KafkaPartition.COMMENT
。我想知道当在同一个bean中定义它时,它是否能够同时处理2个消息POST
和COMMENT
?
@Component
public class PostListener {
private static final Logger logger = LoggerFactory.getLogger(PostListener.class);
private final PostService postService;
private final CommentService commentService;
public PostListener(PostService postService, CommentService commentService) {
this.postService = postService;
this.commentService = commentService;
}
@KafkaListener(topics = KafkaTopic.POST,
groupId = KafkaGroup.POST_GROUP,
topicPartitions = { @TopicPartition(topic = KafkaTopic.POST, partitions = { KafkaPartition.POST })})
public void on(@Payload PostPayload payload,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
postService.createPost(payload.getSenderId(), payload.getContent(), timestamp)
.subscribe();
}
@KafkaListener(topics = KafkaTopic.POST,
groupId = KafkaGroup.POST_GROUP,
topicPartitions = { @TopicPartition(topic = KafkaTopic.POST, partitions = { KafkaPartition.COMMENT })})
public void on(@Payload CommentPayload payload,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
commentService.createComment(payload.getPostId(), payload.getSenderId(), payload.getContent(), timestamp)
.subscribe();
}
}
1条答案
按热度按时间6pp0gazn1#
是的,您可以使用Spring Kafka中的Kafka侦听器处理来自同一主题的不同分区的消息。在本例中,您定义了两个Kafka侦听器,每个侦听器处理来自同一主题(KafkaTopic.POST)的不同分区的消息。
@KafkaListener annotation允许您指定要从中消费消息的主题和分区。由于您使用不同的分区来发布和评论(KafkaPartition.POST和KafkaPartition.COMMENT),因此您的Kafka侦听器将能够同时处理来自这些分区的消息。