我有一个要求,我应该从kafka v.8.1(在集群a中)读取消息,然后写入kafka v.9.0(在集群b中)。
我正在使用spark流来读取kafka a,并使用spark本地kafka类将消息推送到kafka b中。
它给了我以下的错误
16/10/20 15:29:34 INFO VerifiableProperties: Property zookeeper.connect is overridden to
16/10/20 15:29:34 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
16/10/20 15:29:34 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 2)
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
请在下面查找代码段:
val topicsAndOffsets: scala.collection.immutable.Map[TopicAndPartition, Long] = sc.newAPIHadoopRDD(dhrconf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) .map(x => x._2) .map(result => ( TopicAndPartition(
HBaseHelper.getValue(result, HBASE_DEFAULT_CF, HBASE_COL_COUNTER_TOP),
HBaseHelper.getValueAsInt(result, HBASE_DEFAULT_CF, HBASE_COL_COUNTER_KPART)),
HBaseHelper.getValueAsLong(result, HBASE_DEFAULT_CF, HBASE_COL_COUNTER_NEXT)))
.collectAsMap.toMap
val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokerlist_src, "auto.offset.reset" -> "smallest")
//reading kafka as stream
var messages: Option[InputDStream[(String, String)]] = None if (topicsAndOffsets.isEmpty)
{ // Start the stream
messages = Some(KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( streaming, kafkaParams, Set(topic_src)))
}
else
{
messages = Some(KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( streaming, kafkaParams, topicsAndOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)))
}
messages.get.foreachRDD { rdd =>
if (!rdd.isEmpty) { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val nParts = rdd.partitions.size
rdd.foreachPartition { sparkPartition =>
if (!sparkPartition.isEmpty) { //kafka partition details
val currid = TaskContext.get.partitionId
val osr: OffsetRange = offsetRanges(currid)
val untilOffset = osr.untilOffset //exclusive
val id = osr.partition
val topic_src = osr.topic
需要帮助解决此问题…感谢您的帮助:)
暂无答案!
目前还没有任何答案,快来回答吧!