我们是否可以通过使用CollectionAcumator[Int]来保存所有工作人员的状态信息?RDD是巨大的
以下是示例代码片段
var valueColl:CollectionAccumulator[Int] = spark.sparkContext.collectionAccumulator("myValue")
rdd.foreachPartition(p => {
val temp:java.util.List[Int] = valueColl.value
if (!temp.contains(p.value)) {
valueColl.add(p.value)
}
}
1条答案
按热度按时间ojsjcaue1#
不幸的是,这是不可能的。不能在执行器进程中访问累加器的值。
从文档中:
只有驱动程序可以使用累加器的Value方法读取累加器的值。
累加器用于从执行器进程“收集”数据。每个执行器包含累加器的一个或多个示例。累加器的每个示例只能看到在其自己的进程中收集的值。只有当累加器的不同示例被发送到驱动程序进程时,它们才被减少到单个最终值,然后可以在驱动程序上使用。