因为我从几个不同的kafka主题接收消息。所以我需要使用streamingcontext.union方法来合并流。但是我在尝试将kafka偏移更新为zoopkeeper时遇到了一些问题。
错误如下:
java.lang.ClassCastException: org.apache.spark.rdd.UnionRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at com.qingqing.spark.util.KafkaManager.updateZKOffsets(KafkaManager.scala:75)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:43)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
我的代码如下:
有人能帮我找出这个问题吗。提前谢谢
1条答案
按热度按时间kr98yfug1#
对于kafka和spark流媒体,这并不是使用直接流式api的方式。因为您使用的是直接方法,所以这里没有接收器,所以不需要多个流,只有一个流消耗所有主题。您需要提取要使用的所有主题的偏移量,然后创建
DirectKafkaInputDStream
创建人createDirectStream
. 因为我没有代码,一个粗略的草图应该是这样的:对于没有存储任何偏移量的主题,只需从偏移量开始
0
.至于
HasOffsetRanges
,这需要在创建kafka流转换之后直接Map,只有这样才是底层RDD
实际上是实现了这个特性。你得马上去看医生transform
在溪流上: