是否可以在spark+kafkardd中获得特定的消息偏移量

im9ewurl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(285)

我试图通过使用Kafka德来阅读Kafka,以获取消息和元数据。我使用的方法如下:

val messageHandler = (mmd: MessageAndMetadata[String,Array[Byte]]) => {
  (mmd.message(), mmd.offset)
}

   val messagesAndMetadata = KafkaUtils.createRDD[String, Array[Byte], StringDecoder, DefaultDecoder, Tuple2[String,Long]]
    (sc.asInstanceOf[SparkContext], KafkaParams, offsetRangeTrete, leaderForPartitionsMap, messageHandler)

但是编译器显示了一个错误:

ambiguous reference to overloaded definition,
both method createRDD in object KafkaUtils of type 
(jsc: org.apache.spark.api.java.JavaSparkContext, keyClass: Class[String], valueClass: Class[String], keyDecoderClass: Class[kafka.serializer.StringDecoder], valueDecoderClass: Class[kafka.serializer.StringDecoder], recordClass: Class[String], kafkaParams: java.util.Map[String,String], offsetRanges: Array[org.apache.spark.streaming.kafka.OffsetRange], leaders: java.util.Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.Broker], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.api.java.JavaRDD[String]
and  method createRDD in object KafkaUtils of type (sc: org.apache.spark.SparkContext, kafkaParams: Map[String,String], offsetRanges: Array[org.apache.spark.streaming.kafka.OffsetRange], leaders: Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.Broker], messageHandler: kafka.message.MessageAndMetadata[String,String] => String)(implicit evidence$9: scala.reflect.ClassTag[String], implicit evidence$10: scala.reflect.ClassTag[String], implicit evidence$11: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$12: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$13: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[String]
match expected type ?
[ERROR]     val treteMetadata = org.apache.spark.streaming.kafka.KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]

我用的是spark 1.3.0。有人知道怎么解决这个问题吗?
谢谢您

yjghlzjz

yjghlzjz1#

通过为参数leaderforpartitionsmap使用正确的类型解决了这个问题,该参数必须是map[topicandpartition,org.apache.spark.streaming.kafka.broker]

相关问题