在javapairdstream中查找最大值

pbgvytdp  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(269)

有没有办法从Map上找到最大值 JavaPairDStream ? 我的钥匙是一把钥匙 String 价值就是 ArrayList<Row> . 我需要找一个 tuple 它具有arraylist中某一行的最大值。我正在使用spark 1.6,我想为javapairdstream实现类似于javardd.max()的东西。
使用updatestatebykey不是一个选项,因为我需要所有键的关联状态,而不是每个键。我试过以下方法使用蓄能器:

**

if(accumulator.value()< max)
 {
    accumulator.setValue(max);
 }

**

我在reducebykey中执行此操作,但有以下例外:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.UnsupportedOperationException: Can't read accumulator value in task
    at org.apache.spark.Accumulable.value(Accumulators.scala:98)
    at sample.sample.SampleJdd$2.call(SampleJdd.java:82)
    at sample.sample.SampleJdd$2.call(SampleJdd.java:74)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:200)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:199)
    at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
    at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
     org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题