我试图提交一个Spark流+Kafka的工作,只是从Kafka的主题阅读字符串行。然而,我得到以下例外
15/07/24 22:39:45错误tasksetmanager:阶段2.0中的任务0失败4次;中止线程“thread-49”org.apache.spark.sparkeexception中的作业异常:由于阶段失败而中止作业:阶段2.0中的任务0失败了4次,最近的失败:阶段2.0中的任务0.3丢失(tid 73,10.11.112.93):java.lang.nosuchmethodexception:kafka.serializer.stringdecoder.(kafka.utils.verifiableproperties)java.lang.class.getconstructor0(类。java:2892)java.lang.class.getconstructor(类。java:1723)org.apache.spark.streaming.kafka.kafkareceiver.onstart(kafkainputdstream。scala:106) org.apache.spark.streaming.receiver.receiversupervisor.startreceiver(receiversupervisor。scala:121)org.apache.spark.streaming.receiver.receiversupervisor.start(receiversupervisor。scala:106)org.apache.spark.streaming.scheduler.receivertracker$receiverlauncher$$anonfun$9.apply(receivertracker。scala:264) org.apache.spark.streaming.scheduler.receivertracker$receiverlauncher$$anonfun$9.apply(receivertracker。scala:257)org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext。scala:1121)org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext。scala:1121)org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:62) org.apache.spark.scheduler.task.run(任务。scala:54)org.apache.spark.executor.executor$taskrunner.run(executor。scala:177)java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)java.lang.thread.run(线程。java:745)
当我检查dse使用的sparkjar文件时,我看到它使用了kafka_2.10-0.8.0.jar,而kafka_2.10-0.8.0.jar确实有这个构造函数。不确定是什么导致了错误。这是我的消费代码
val sc = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
val topicMap = kafkaTopics.split(",").map((_, numThreads.toInt)).toMap
val accessLogsStream = KafkaUtils.createStream(streamingContext, zooKeeper, "AccessLogsKafkaAnalyzer", topicMap)
val accessLogs = accessLogsStream.map(_._2).map(log => ApacheAccessLog.parseLogLine(log).cache()
更新此异常似乎只发生在我提交作业时。如果我使用sparkshell通过粘贴代码来运行作业,它就可以正常工作
1条答案
按热度按时间8iwquhpp1#
我的自定义解码器也面临同样的问题。我添加了以下构造函数,解决了这个问题。