读Kafka8.1,写Kafka9.0

gcuhipw9  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(138)

我有一个要求,我应该从kafka v.8.1(在集群a中)读取消息,然后写入kafka v.9.0(在集群b中)。
我正在使用spark流来读取kafka a,并使用spark本地kafka类将消息推送到kafka b中。
它给了我以下的错误

16/10/20 15:29:34 INFO VerifiableProperties: Property zookeeper.connect is overridden to
16/10/20 15:29:34 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:275)
        at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Range.foreach(Range.scala:141)

16/10/20 15:29:34 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 2)
java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:275)
        at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:37)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:99)
        at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:97)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Range.foreach(Range.scala:141)

请在下面查找代码段:

val topicsAndOffsets: scala.collection.immutable.Map[TopicAndPartition, Long] = sc.newAPIHadoopRDD(dhrconf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) .map(x => x._2)     .map(result => ( TopicAndPartition( 
HBaseHelper.getValue(result, HBASE_DEFAULT_CF, HBASE_COL_COUNTER_TOP), 
HBaseHelper.getValueAsInt(result, HBASE_DEFAULT_CF, HBASE_COL_COUNTER_KPART)), 
HBaseHelper.getValueAsLong(result, HBASE_DEFAULT_CF, HBASE_COL_COUNTER_NEXT))) 
.collectAsMap.toMap

 val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokerlist_src, "auto.offset.reset" -> "smallest")

//reading kafka as stream 
    var messages: Option[InputDStream[(String, String)]] = None if (topicsAndOffsets.isEmpty) 
{ // Start the stream 
messages = Some(KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( streaming, kafkaParams, Set(topic_src))) 
}
 else 
{ 
messages =    Some(KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( streaming, kafkaParams, topicsAndOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))) 
}

    messages.get.foreachRDD { rdd =>

if (!rdd.isEmpty) { val offsetRanges =   rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
val nParts = rdd.partitions.size

rdd.foreachPartition { sparkPartition =>

if (!sparkPartition.isEmpty) { //kafka partition details 
val currid =  TaskContext.get.partitionId 
val osr: OffsetRange = offsetRanges(currid) 
val untilOffset = osr.untilOffset //exclusive 
val id = osr.partition 
val topic_src = osr.topic

需要帮助解决此问题…感谢您的帮助:)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题