我正在玩spark流媒体和kafka(使用scalaapi),并且想通过spark流媒体阅读一组kafka主题中的消息。
以下方法:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
从kafka读取到最新的可用偏移量,但没有提供所需的元数据(因为我是从一组主题中读取的,所以我需要读取该主题的每条消息),但这个方法 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)
想要一个我没有的补偿。
我知道有一个shell命令给出了最后一个偏移量。
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
以及 KafkaCluster.scala
是一个api,是为开发人员,过去是公开的,并给你正是我想要的。
提示?
1条答案
按热度按时间hrysbysz1#
您可以使用getoffsetshell.scala kafka api文档中的代码
或者您可以创建具有唯一groupid的新消费者,并使用它获取第一个偏移量