我对spark和scala比较陌生,但是我决定在这里发布一个代码示例,它非常简单,在我看来不应该引起严重的问题,但是在实践中,它在aws emr spark环境中经常会导致内存不足错误,这取决于 maxIterations
:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException
val config = new SparkConf().setAppName("test graphx")
config.set("spark.driver.allowMultipleContexts","true")
val batch_id=new Integer(31)
val maxIterations=2 //200 interations are causing out of memory
var myVertices = sc.makeRDD(Array( (1L, ("A",batch_id,0.0,0.0,0.0,11.0)), (2L, ("B",batch_id,0.0,1000.0,0.0,300.0)), (3L, ( "C", batch_id, 1000.0, 1000.0, 0.0, 8.0)), (4L, ("D",batch_id,1000.0, 0.0, 0.0, 400.0)) ))
var myEdges = sc.makeRDD(Array(Edge(4L, 3L, (7.7, 0.0) ), Edge(2L, 3L, (5.0, 0.0) ), Edge(2L, 1L, (12.0, 0.0))))
var myGraph=Graph(myVertices,myEdges)
myGraph.cache
myGraph.triplets.foreach(println)
//we need to calculate some constant values for each edge before start of pregel
val initGraph=myGraph.mapTriplets(tr =>
(tr.attr._1, (tr.attr._1 *
(scala.math.sqrt((tr.dstAttr._3-tr.srcAttr._3)*(tr.dstAttr._3-tr.srcAttr._3)+( tr.dstAttr._4-tr.srcAttr._4)*( tr.dstAttr._4-tr.srcAttr._4)+(tr.dstAttr._5-tr.srcAttr._5)*(tr.dstAttr._5-tr.srcAttr._5))) *
(scala.math.sqrt((tr.dstAttr._3-tr.srcAttr._3)*(tr.dstAttr._3-tr.srcAttr._3)+( tr.dstAttr._4-tr.srcAttr._4)*( tr.dstAttr._4-tr.srcAttr._4)+(tr.dstAttr._5-tr.srcAttr._5)*(tr.dstAttr._5-tr.srcAttr._5))) /
(tr.dstAttr._6 * tr.dstAttr._6))
)
)
initGraph.triplets.take(100).foreach(println)
val distanceStep = 0.1
val tolerance = 1
val sssp = initGraph.pregel( (0.0, 0.0, 0.0, 0.0), maxIterations //500-3000
)(
(id: VertexId, vert: ((String, Integer, Double, Double, Double, Double)), msg: (Double, Double, Double, Double)) =>
(
vert._1,vert._2,
( if (scala.math.abs(msg._1)> tolerance) {vert._3+distanceStep*msg._1 } else { vert._3 }),
( if (scala.math.abs(msg._2)> tolerance) {vert._4+distanceStep*msg._2 } else { vert._4 }),
( if (scala.math.abs(msg._3)> tolerance) {vert._5+distanceStep*msg._3 } else { vert._5 }),
vert._6
),// Vertex Program
e => { // Send Message
Iterator(
(
e.dstId,
(
((e.srcAttr._3 - e.dstAttr._3)*distanceStep*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )), //x
((e.srcAttr._4 - e.dstAttr._4)*distanceStep*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )), //y
((e.srcAttr._5 - e.dstAttr._5)*distanceStep*scala.math.sqrt( 2*e.attr._2*e.srcAttr._6 / ((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) )), //z
e.attr._1*distanceStep*scala.math.sqrt((e.dstAttr._3-e.srcAttr._3)*(e.dstAttr._3-e.srcAttr._3)+( e.dstAttr._4-e.srcAttr._4)*( e.dstAttr._4-e.srcAttr._4)+(e.dstAttr._5-e.srcAttr._5)*(e.dstAttr._5-e.srcAttr._5)) //vector module
)
)
)
},
{
(a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3, 0) // Merge Message
}
)
sssp.vertices.take(10).foreach(println)
我通过zeppelin在4节点m5.x2大型集群上的aws emr中运行它,但是它可以在spark中作为作业快速采用和执行。
简而言之,这段代码创建了一个 myGraph
具有4个顶点和3条边的图。然后对于每个三元组,我计算一些常量值并使用图形对象 initGraph
为了这个。
那么对于 initGraph
我应用pregelapi,它的执行只受迭代次数的限制 maxIterations
. 在这一刻,对于pregelapi,我看到了奇怪的行为。对于小型 maxIterations
值(小于10)它工作得非常快,对于100-150次迭代,它在齐柏林飞艇中执行3-4分钟,对于200次迭代,它会以不同的错误失败(connectionclosed等)。
当我把maxiterations=150或200设置为
分配的内存直线上升,可用内存以同样的速度下降。
由于我是spark的新手,我不确定这是不是正确的行为,老实说,我找不到一个解释,即使在这样一个小的图上,200次pregel迭代,什么会消耗千兆字节的内存。如果您可以在您的终端上复制它并进行检查,我很好奇地听取您关于性能优化的建议,因为如果我扩展集群并在更大的硬件设置上运行相同的代码,这只是一个问题 maxIterations
和图的大小来实际得到相同的outofmemory错误。我需要在超过1米的顶点和7米的边上运行这个程序,所以我不知道如果这个问题不能解决,需要什么样的硬件。
暂无答案!
目前还没有任何答案,快来回答吧!