spark graphframes高无序读/写

arknldoa  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(404)

嗨,我已经创建了使用顶点和边文件的图形。图形大小为600gb。我使用spark graphframes的motif特性查询这个图。我已经设置了一个aws emr集群来查询图形。
集群details:- 1 主人和8个奴隶
主节点:

m5.xlarge
    4 vCore, 16 GiB memory, EBS only storage
    EBS Storage:64 GiB

从属节点:

m5.4xlarge
    16 vCore, 64 GiB memory, EBS only storage
    EBS Storage:256 GiB (per instance)

我面临着非常高的无序读(3.4tb)和写(2tb),这会影响性能,只执行10个查询需要大约50分钟。有没有办法减少这种高无序。
下面是我的Sparkcode:-

val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()

val g:GraphFrame  = GraphFrame(vertexDf, edgeDf)

//queries

    val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")

    q1.filter(
      " r1.relationship = 'knows' and" +
                  " r2.relationship = 'knows'").distinct()
      .createOrReplaceTempView("q1table")

    spark.sql("select a.id as a_id,a.name as a_name," +
                      "b.id as b_id,b.name as b_name," +
                      "c.id as c_id,c.name as c_name from q1table")
      .write
      .option("quote", "\"")
      .option("escape", "\"")
      .option("header","true")
      .csv(resFilePath + "/q1")

    spark.catalog.uncacheTable("q1table")

    val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
    q2.filter(
      " a.name = 'user1' and" +
        " e.name = 'user4' and" +
        " r1.relationship = 'knows' and" +
        " r2.relationship = 'knows' and" +
        " r3.relationship = 'knows' and" +
        " r4.relationship = 'knows'").distinct()
      .createOrReplaceTempView("q2table")

    spark.sql("select a.id as a_id, a.name as a_name ," +
      "e.id as e_id, e.name as e_name from q2table")
      .write
      .option("quote", "\"")
      .option("escape", "\"")
      .option("header","true")
      .csv(resFilePath + "/q2")

    spark.catalog.uncacheTable("q2table")

spark.stop()

iqjalb3h

iqjalb3h1#

graphframes实现的问题是,它使内部Dataframe的自连接次数与在motif上使用的次数相同。这意味着随着链的长度增加,你将有更多的洗牌
你可以在网站上看到更多细节https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read
我也尝试过类似的方法,当链的长度大于12时,spark开始没有响应,与执行者的连接也会丢失,即使我增加了资源。
如果您正在尝试这样做,我建议您改用图形数据库。
希望这有帮助

相关问题