我如何使用spring Boot 多次读取Kafka的消息

3lxsmp7m  于 2023-05-27  发布在  Spring
关注(0)|答案(1)|浏览(186)

我正在使用Sping Boot 应用程序来测试微服务。我想多次阅读某个主题的最后一条X消息。(例如:多个测试场景想要检查微服务是否向Kafka主题发送了正确的消息)我该如何做到这一点?

bwitn5fc

bwitn5fc1#

Kafka消费者通过跟踪主题中每个分区的偏移量来工作。这个偏移量指向消费者接下来应该读取的记录。消费者偏移量(记录已读取的消息)按消费者组进行管理。这意味着不同的消费者组可以独立地读取同一组消息,而不会相互干扰。
对于您的情况,每个测试场景都有自己独特的消费者组。这将允许每个测试场景从主题的开始或任何指定的偏移量读取,独立于其他测试。所以典型的测试应该是这样的:

void thisIsTheTest() {
    // GIVEN
    var subscription = kafkaTestUtil.subscribeTo(topic, groupId)

    // WHEN
    // execute your scenarion

    // THEN
    var messages = subscription.poolLast(x);
}

您需要在某个测试实用程序类中实现订阅和池逻辑。
如果要读取最后的X消息,则需要找到最新的偏移量,然后查找该偏移量减去X。但是请注意,从主题中检索最后的X消息可能并不简单,因为主题由分区组成,每个分区都有自己的偏移量,并且消息根据分区键分布在分区中。因此,如何计算最后的X消息并不十分清楚。
简化这个过程的一个简单方法是为测试(ONLY TESTING)环境中的每个主题分配一个分区。这将使检索最近的X消息变得容易。但实际上,这种选择可能并不适合所有人。
P.S.你可以在这里阅读更多关于消费者群体的信息:https://kafka.apache.org/documentation/#consumerconfigs_group.idhttps://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups

相关问题