有了kafkajavaapi,我可以使用一个重新平衡的监听器,该监听器会像这样寻找主题的开头(代码是scala,kafkaapi是java):
class SeekToBeginningRebalanceListener[K, V](val consumer: KafkaConsumer[K, V]) extends ConsumerRebalanceListener {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
for (tp <- partitions.asScala) {
consumer.seekToBeginning(util.Arrays.asList(tp))
}
}
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { }
}
我在订阅这样的主题时会用到:
kafkaConsumer.subscribe(java.util.Arrays.asList(topicName), new SeekToBeginningRebalanceListener(kafkaConsumer))
如何使用合流的kafka python api实现这一点?
我可以编写一个类似的分区分配回调函数来调用:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("assigned to partition. topic={}. partition={}. offset={}. error={}".format(
topic_partition.topic, topic_partition.partition, topic_partition.offset, topic_partition.error))
# This runs but has no effect.
topic_partition.offset = 0
但是我找不到任何api来进行搜索。我该怎么做?
1条答案
按热度按时间vqlkdk9b1#
从https://github.com/confluentinc/confluent-kafka-python/issues/11
这对我很有用: