我正在使用Sping Boot 应用程序来测试微服务。我想多次阅读某个主题的最后一条X消息。(例如:多个测试场景想要检查微服务是否向Kafka主题发送了正确的消息)我该如何做到这一点?
bwitn5fc1#
Kafka消费者通过跟踪主题中每个分区的偏移量来工作。这个偏移量指向消费者接下来应该读取的记录。消费者偏移量(记录已读取的消息)按消费者组进行管理。这意味着不同的消费者组可以独立地读取同一组消息,而不会相互干扰。对于您的情况,每个测试场景都有自己独特的消费者组。这将允许每个测试场景从主题的开始或任何指定的偏移量读取,独立于其他测试。所以典型的测试应该是这样的:
void thisIsTheTest() { // GIVEN var subscription = kafkaTestUtil.subscribeTo(topic, groupId) // WHEN // execute your scenarion // THEN var messages = subscription.poolLast(x); }
您需要在某个测试实用程序类中实现订阅和池逻辑。如果要读取最后的X消息,则需要找到最新的偏移量,然后查找该偏移量减去X。但是请注意,从主题中检索最后的X消息可能并不简单,因为主题由分区组成,每个分区都有自己的偏移量,并且消息根据分区键分布在分区中。因此,如何计算最后的X消息并不十分清楚。简化这个过程的一个简单方法是为测试(ONLY TESTING)环境中的每个主题分配一个分区。这将使检索最近的X消息变得容易。但实际上,这种选择可能并不适合所有人。P.S.你可以在这里阅读更多关于消费者群体的信息:https://kafka.apache.org/documentation/#consumerconfigs_group.idhttps://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups
X
1条答案
按热度按时间bwitn5fc1#
Kafka消费者通过跟踪主题中每个分区的偏移量来工作。这个偏移量指向消费者接下来应该读取的记录。消费者偏移量(记录已读取的消息)按消费者组进行管理。这意味着不同的消费者组可以独立地读取同一组消息,而不会相互干扰。
对于您的情况,每个测试场景都有自己独特的消费者组。这将允许每个测试场景从主题的开始或任何指定的偏移量读取,独立于其他测试。所以典型的测试应该是这样的:
您需要在某个测试实用程序类中实现订阅和池逻辑。
如果要读取最后的
X
消息,则需要找到最新的偏移量,然后查找该偏移量减去X
。但是请注意,从主题中检索最后的X
消息可能并不简单,因为主题由分区组成,每个分区都有自己的偏移量,并且消息根据分区键分布在分区中。因此,如何计算最后的X
消息并不十分清楚。简化这个过程的一个简单方法是为测试(ONLY TESTING)环境中的每个主题分配一个分区。这将使检索最近的X消息变得容易。但实际上,这种选择可能并不适合所有人。
P.S.你可以在这里阅读更多关于消费者群体的信息:https://kafka.apache.org/documentation/#consumerconfigs_group.idhttps://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups