有没有办法从Map上找到最大值 JavaPairDStream
? 我的钥匙是一把钥匙 String
价值就是 ArrayList<Row>
. 我需要找一个 tuple
它具有arraylist中某一行的最大值。我正在使用spark 1.6,我想为javapairdstream实现类似于javardd.max()的东西。
使用updatestatebykey不是一个选项,因为我需要所有键的关联状态,而不是每个键。我试过以下方法使用蓄能器:
**
if(accumulator.value()< max)
{
accumulator.setValue(max);
}
**
我在reducebykey中执行此操作,但有以下例外:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.UnsupportedOperationException: Can't read accumulator value in task
at org.apache.spark.Accumulable.value(Accumulators.scala:98)
at sample.sample.SampleJdd$2.call(SampleJdd.java:82)
at sample.sample.SampleJdd$2.call(SampleJdd.java:74)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:200)
at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:199)
at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48
暂无答案!
目前还没有任何答案,快来回答吧!