spark unionrdd不能转换为hasoffsetranges

tkqqtvp1  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(435)

因为我从几个不同的kafka主题接收消息。所以我需要使用streamingcontext.union方法来合并流。但是我在尝试将kafka偏移更新为zoopkeeper时遇到了一些问题。
错误如下:

java.lang.ClassCastException: org.apache.spark.rdd.UnionRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at com.qingqing.spark.util.KafkaManager.updateZKOffsets(KafkaManager.scala:75)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:43)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

我的代码如下:


有人能帮我找出这个问题吗。提前谢谢

kr98yfug

kr98yfug1#

对于kafka和spark流媒体,这并不是使用直接流式api的方式。因为您使用的是直接方法,所以这里没有接收器,所以不需要多个流,只有一个流消耗所有主题。您需要提取要使用的所有主题的偏移量,然后创建 DirectKafkaInputDStream 创建人 createDirectStream . 因为我没有代码,一个粗略的草图应该是这样的:

val offsets: Map[TopicAndPartition, Long] = 
  topics.map { /* create the (TopicAndPartition, Long) tuple here for each topic */ }

val kafkaInputStream = 
  KafkaUtils.createDirectStream(ssc, kafkaParams, offsets, (mmd) => (mmd.key, mmd.value))

对于没有存储任何偏移量的主题,只需从偏移量开始 0 .
至于 HasOffsetRanges ,这需要在创建kafka流转换之后直接Map,只有这样才是底层 RDD 实际上是实现了这个特性。你得马上去看医生 transform 在溪流上:

val streamAfterTransform = kafkaInputStream.transform { rdd =>
  val ranges = rdd.asInstanceOf[HasOffsetRanges]
  // To stuff with ranges
}

相关问题