我实现了一个循环分区器,如下所示:
public class KafkaRoundRobinPartitioner implements Partitioner {
private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class);
final AtomicInteger counter = new AtomicInteger(0);
public KafkaRoundRobinPartitioner() {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionsCount = partitions.size();
int partitionId = counter.incrementAndGet() % partitionsCount;
if (counter.get() > 65536) {
counter.set(partitionId);
}
return partitionId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
现在我想测试每个分区有相同数量的消息。例如,如果我有一个带有32个分区的主题,并且我向这个主题发送了32条消息,我希望每个分区正好有一条消息。
我想做如下事情:
KafkaPartitions allPartitions = new KafkaTopic("topic_name");
for (KafkaPartition partition : allPartitions) {
int msgCount = partition.getMessagesCount();
// do asserts
}
据我所知,kafkajavaapi并没有为我们提供这样的功能,但我可能在文档中丢失了一些东西。
有什么方法可以优雅地执行它吗?
我只找到了一个基本的解决方案。因为我使用的是多消费者模型,所以我可以为每个消费者执行以下操作:
consumer.assignment().size();
之后我可以做:
consumer.poll(100);
并检查每个消费者是否都有一条消息。在这种情况下,我不应该面对这样一种情况:一个消费者从它的分区中得到另一个消费者的消息,因为我有相同数量的消费者和分区,kafka应该以循环方式在消费者之间分配分区。
2条答案
按热度按时间tvz2xvvm1#
你可以用
seekToBeginning()
以及seekToEnd()
计算每个分区的偏移量之差。baubqpgj2#
最后,我写了如下的东西。
我的Kafka消费者的工人有以下代码:
在我的测试中,我决定检查每个消费者是否只执行了一次提交,这意味着消息分发是以循环方式进行的。测试代码: