spark流不保存信息

brc7rcf0  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(327)

我创建了一个类似于wordcount的spark流脚本。另外,我希望将所有信息存储在一个集合(addeddd)中,但过了一段时间,由于块消失,因此会启动一个异常。有没有办法把这个累积rdd保存在内存中?

import org.apache.spark._
import org.apache.spark.streaming._
import scala.collection.mutable
import org.apache.spark.rdd.RDD

val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost",9999)
var addedRDD : RDD[String] = sc.emptyRDD[String]

lines.foreachRDD( 
    rdd => {
        addedRDD = addedRDD.union(rdd).cache()
        addedRDD.collect().foreach(println)
    }
)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print
ssc.start() 
ssc.awaitTermination()

引发的异常如下(表示该块已被删除):

ERROR scheduler.JobScheduler: Error running job streaming job 1509014590000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[86] at socketTextStream at <console>:38 after its blocks have been removed!
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1547)
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1521)
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974)
org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:981)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:47)
        at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:44)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[86] at socketTextStream at <console>:38 after its blocks have been removed!
        at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
        at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.rdd.UnionPartition.preferredLocations(UnionRDD.scala:47)
        at org.apache.spark.rdd.UnionRDD.getPreferredLocations(UnionRDD.scala:91)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1547)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1521)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:974)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$15.apply(DAGScheduler.scala:972)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:972)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1609)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
3hvapo4f

3hvapo4f1#

上面的错误是 OOM(Out Of MemoryException) 因为你在储存 rdd 在内存中,在内存中添加一些rdd之后,默认情况下,执行器将运行内存不足异常 cache() 将rdd存储在内存中
选中下面的选项,将帮助您将rdd存储到 DISK_ONLY 你也可以选择像 MEMORY_AND_DISK,MEMORY_AND_DISK_SER,MEMORY_ONLY,MEMORY_ONLY_SER 每个选项的后缀都是 _2 表示磁盘或内存中没有副本,ser表示序列化以减少磁盘或内存中的存储空间

import org.apache.spark.storage.StorageLevel
addedRDD = addedRDD.union(rdd).persist(StorageLevel.DISK_ONLY)

相关问题