我在这里描述了一个非常类似的问题:如何在spark中对每个执行器执行一次操作我在第一个答案中采用了第一种方法,但仍然遇到了序列化问题。
我想做的是,我有一个元组(sourcevertex,targetvertex)这样的查询,并将它们发送给执行者,执行者将返回给我一个最短路径。为此,我使用jgrapht。
当我这样做的时候
class ShortestPath(graph: SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge],
bc: Broadcast[SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge]]) extends Serializable {
def calculateShortestPath(vertexRDD: RDD[Node]) = {
val result = vertexRDD.map(vertex => {
val dijkstraShortestPath: DijkstraShortestPath[Node, DefaultWeightedEdge]
= new DijkstraShortestPath[Node, DefaultWeightedEdge](bc.value)
val distanceIn = dijkstraShortestPath.getPath(vertex, Node(4, 1, true)).getWeight()
distanceIn
})
result.collect().foreach(println(_))
}
}
object ShortestPath {
def apply(graph: SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge],
bc: Broadcast[SimpleDirectedWeightedGraph[Node, DefaultWeightedEdge]]): ShortestPath = new ShortestPath(graph, bc)
}
一切都很好,但问题是我觉得我在创造 dijkstraShortestPath
每项任务的对象,对吗?
我的目标是为每个执行器创建这个对象,并将它用于该执行器上的每个任务。
我给的链接是用lazy val创建一个对象,在这里示例化你的想法然后使用rdd map函数。我实现的解决方案如下:
object Dij {
lazy val dijsktra = {
val graph = GraphCreator.createGraph()
val dijkstraShortestPath: DijkstraShortestPath[Node, DefaultWeightedEdge] = new DijkstraShortestPath[Node, DefaultWeightedEdge](graph)
dijkstraShortestPath
}
}
用于最短路径类
val result = vertexRDD.map(vertex => {
val dijkstraShortestPath = Dij.dijsktra
val distanceIn = dijkstraShortestPath.getPath(vertex, Node(4, 1, true)).getWeight()
dijkstraShortestPath
})
result.collect().foreach(println(_))
不过,我有点误会了,谢谢 - object not serializable (class: org.jgrapht.alg.shortestpath.DijkstraShortestPath, value: org.jgrapht.alg.shortestpath.DijkstraShortestPath@2cb8e13b)
这是正确的,当我看实现时,没有可序列化的。
另一个问题是,如果它不可序列化,那么我的第一个实现是如何工作的?
1条答案
按热度按时间slhcrj9b1#
我认为你的第二个片段中有一个意外错误。
赋予Map的第一个函数返回一个权重(可能是一个
Double
?)第二个返回DijkstraShortestPath
不可序列化的。这就解释了为什么这两个片段的行为不同。