我试图通过使用Kafka德来阅读Kafka,以获取消息和元数据。我使用的方法如下:
val messageHandler = (mmd: MessageAndMetadata[String,Array[Byte]]) => {
(mmd.message(), mmd.offset)
}
val messagesAndMetadata = KafkaUtils.createRDD[String, Array[Byte], StringDecoder, DefaultDecoder, Tuple2[String,Long]]
(sc.asInstanceOf[SparkContext], KafkaParams, offsetRangeTrete, leaderForPartitionsMap, messageHandler)
但是编译器显示了一个错误:
ambiguous reference to overloaded definition,
both method createRDD in object KafkaUtils of type
(jsc: org.apache.spark.api.java.JavaSparkContext, keyClass: Class[String], valueClass: Class[String], keyDecoderClass: Class[kafka.serializer.StringDecoder], valueDecoderClass: Class[kafka.serializer.StringDecoder], recordClass: Class[String], kafkaParams: java.util.Map[String,String], offsetRanges: Array[org.apache.spark.streaming.kafka.OffsetRange], leaders: java.util.Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.Broker], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.api.java.JavaRDD[String]
and method createRDD in object KafkaUtils of type (sc: org.apache.spark.SparkContext, kafkaParams: Map[String,String], offsetRanges: Array[org.apache.spark.streaming.kafka.OffsetRange], leaders: Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.Broker], messageHandler: kafka.message.MessageAndMetadata[String,String] => String)(implicit evidence$9: scala.reflect.ClassTag[String], implicit evidence$10: scala.reflect.ClassTag[String], implicit evidence$11: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$12: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$13: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[String]
match expected type ?
[ERROR] val treteMetadata = org.apache.spark.streaming.kafka.KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]
我用的是spark 1.3.0。有人知道怎么解决这个问题吗?
谢谢您
1条答案
按热度按时间yjghlzjz1#
通过为参数leaderforpartitionsmap使用正确的类型解决了这个问题,该参数必须是map[topicandpartition,org.apache.spark.streaming.kafka.broker]