Kafka想从pythonapi开始吗?

gab6jxml  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(233)

有了kafkajavaapi,我可以使用一个重新平衡的监听器,该监听器会像这样寻找主题的开头(代码是scala,kafkaapi是java):

class SeekToBeginningRebalanceListener[K, V](val consumer: KafkaConsumer[K, V]) extends ConsumerRebalanceListener {
  override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
    for (tp <- partitions.asScala) {
      consumer.seekToBeginning(util.Arrays.asList(tp))
    }
  }

  override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { }
}

我在订阅这样的主题时会用到:

kafkaConsumer.subscribe(java.util.Arrays.asList(topicName), new SeekToBeginningRebalanceListener(kafkaConsumer))

如何使用合流的kafka python api实现这一点?
我可以编写一个类似的分区分配回调函数来调用:

def on_assign_callback(consumer, topic_partitions):
    for topic_partition in topic_partitions:
        print("assigned to partition. topic={}. partition={}. offset={}. error={}".format(
            topic_partition.topic, topic_partition.partition, topic_partition.offset, topic_partition.error))

        # This runs but has no effect.
        topic_partition.offset = 0

但是我找不到任何api来进行搜索。我该怎么做?

vqlkdk9b

vqlkdk9b1#

从https://github.com/confluentinc/confluent-kafka-python/issues/11
这对我很有用:

def on_assign (c, ps):
    for p in ps:
        p.offset=-2
    c.assign(ps)

c.subscribe(['test'], on_assign=on_assign)
``` `seekToBeginning` 当前未实现合流kafka python api的函数。

相关问题