我想把 StorageLevel.MEMORY_AND_DISK_SER
在我的spark流应用程序中,希望防止 MetadataFetchFailedException
.
我不知道该从哪里经过 StorageLevel.MEMORY_AND_DISK
好像是这样的 createDirectStream
不允许传递该参数。
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
完全错误:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2条答案
按热度按时间brjng4g31#
我们可以使用receiverlauncher.launch从kafka生成数据流。请找到下面的示例代码来设置kafka流数据的存储级别。
Properties props = new Properties(); props.put("zookeeper.hosts", "x.x.x.x"); props.put("zookeeper.port", "2181"); props.put("zookeeper.broker.path", "/brokers"); props.put("kafka.topic", "some-topic"); props.put("kafka.consumer.id", "12345"); props.put("zookeeper.consumer.connection", "x.x.x.x:2181"); props.put("zookeeper.consumer.path", "/consumer-path"); //Optional Properties props.put("consumer.forcefromstart", "true"); props.put("consumer.fetchsizebytes", "1048576"); props.put("consumer.fillfreqms", "250"); props.put("consumer.backpressure.enabled", "true");
````SparkConf _sparkConf = new SparkConf().setAppName("KafkaReceiver")
.set("spark.streaming.receiver.writeAheadLog.enable", "false");;
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf,
new Duration(5000));
//Specify number of Receivers you need.
//It should be less than or equal to number of Partitions of your topic
int numberOfReceivers = 3;
JavaDStream unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY());
unionStreams
.foreachRDD(new Function2<JavaRDD, Time, Void>() {
jsc.start();
jsc.awaitTermination();`
ekqde3dh2#
在你开始你的工作之前使用persist
StreamingContext
(以及spark流媒体应用程序):但是,这可能会导致以下异常,因此您应该首先以某种方式转换数据流,例如。
map
,以获得DStream
可序列化对象的。