嗨,我已经创建了使用顶点和边文件的图形。图形大小为600gb。我使用spark graphframes的motif特性查询这个图。我已经设置了一个aws emr集群来查询图形。
集群details:- 1 主人和8个奴隶
主节点:
m5.xlarge
4 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
从属节点:
m5.4xlarge
16 vCore, 64 GiB memory, EBS only storage
EBS Storage:256 GiB (per instance)
我面临着非常高的无序读(3.4tb)和写(2tb),这会影响性能,只执行10个查询需要大约50分钟。有没有办法减少这种高无序。
下面是我的Sparkcode:-
val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()
val g:GraphFrame = GraphFrame(vertexDf, edgeDf)
//queries
val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")
q1.filter(
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows'").distinct()
.createOrReplaceTempView("q1table")
spark.sql("select a.id as a_id,a.name as a_name," +
"b.id as b_id,b.name as b_name," +
"c.id as c_id,c.name as c_name from q1table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q1")
spark.catalog.uncacheTable("q1table")
val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
q2.filter(
" a.name = 'user1' and" +
" e.name = 'user4' and" +
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows' and" +
" r3.relationship = 'knows' and" +
" r4.relationship = 'knows'").distinct()
.createOrReplaceTempView("q2table")
spark.sql("select a.id as a_id, a.name as a_name ," +
"e.id as e_id, e.name as e_name from q2table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q2")
spark.catalog.uncacheTable("q2table")
spark.stop()
1条答案
按热度按时间iqjalb3h1#
graphframes实现的问题是,它使内部Dataframe的自连接次数与在motif上使用的次数相同。这意味着随着链的长度增加,你将有更多的洗牌
你可以在网站上看到更多细节https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read
我也尝试过类似的方法,当链的长度大于12时,spark开始没有响应,与执行者的连接也会丢失,即使我增加了资源。
如果您正在尝试这样做,我建议您改用图形数据库。
希望这有帮助