kafka和spark:通过api获取主题的第一个偏移量

2wnc66cl  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(297)

我正在玩spark流媒体和kafka(使用scalaapi),并且想通过spark流媒体阅读一组kafka主题中的消息。
以下方法:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

从kafka读取到最新的可用偏移量,但没有提供所需的元数据(因为我是从一组主题中读取的,所以我需要读取该主题的每条消息),但这个方法 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler) 想要一个我没有的补偿。
我知道有一个shell命令给出了最后一个偏移量。

kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1

以及 KafkaCluster.scala 是一个api,是为开发人员,过去是公开的,并给你正是我想要的。
提示?

hrysbysz

hrysbysz1#

您可以使用getoffsetshell.scala kafka api文档中的代码

val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

或者您可以创建具有唯一groupid的新消费者,并使用它获取第一个偏移量

val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList))
consumer.partitionsFor(config.topic).foreach(pi => {
      val topicPartition = new TopicPartition(pi.topic(), pi.partition())

      consumer.assign(List(topicPartition))
      consumer.seekToBeginning()
      val firstOffset = consumer.position(topicPartition)
 ...

相关问题