在spark sql中高效缓存Dataframe

rjjhvcjd  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(439)

用例是多次自联接一个表。

// Hive Table
val network_file = spark.sqlContext.sql("SELECT * FROM 
test.network_file")

// Cache
network_file.cache()

network_file.createOrReplaceTempView("network_design")

现在,下面的查询执行多次自联接。

val res = spark.sqlContext.sql("""select 
          one.sourcehub as source,
          one.mappedhub as first_leg,
          two.mappedhub as second_leg,
          one.destinationhub as dest
          from 
          (select * from network_design) one  JOIN 
          (select * from network_design) two  JOIN 
          (select * from network_design) three  
          ON (two.sourcehub =  one.mappedhub )
          AND (three.sourcehub = two.mappedhub)
          AND (one.destinationhub = two.destinationhub )
          AND (two.destinationhub = three.destinationhub)
          group by source, first_leg, second_leg, dest
          """)

问题是,上述查询的物理计划建议读取表三次。

== Physical Plan ==

* HashAggregate(keys=[sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84], functions=[])

+- Exchange hashpartitioning(sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84, 200)
   +- *HashAggregate(keys=[sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84], functions=[])
      +- *Project [sourcehub#83, destinationhub#84, mappedhub#85, mappedhub#109]
         +- *BroadcastHashJoin [mappedhub#109, destinationhub#108], [sourcehub#110, destinationhub#111], Inner, BuildRight
            :- *Project [sourcehub#83, destinationhub#84, mappedhub#85, destinationhub#108, mappedhub#109]
            :  +- *BroadcastHashJoin [mappedhub#85, destinationhub#84], [sourcehub#107, destinationhub#108], Inner, BuildRight
            :     :- *Filter (isnotnull(destinationhub#84) && isnotnull(mappedhub#85))
            :     :  +- InMemoryTableScan [sourcehub#83, destinationhub#84, mappedhub#85], [isnotnull(destinationhub#84), isnotnull(mappedhub#85)]
            :     :        +- InMemoryRelation [sourcehub#83, destinationhub#84, mappedhub#85], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            :     :              +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2]
            :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, string, false]))
            :        +- *Filter ((isnotnull(sourcehub#107) && isnotnull(destinationhub#108)) && isnotnull(mappedhub#109))
            :           +- InMemoryTableScan [sourcehub#107, destinationhub#108, mappedhub#109], [isnotnull(sourcehub#107), isnotnull(destinationhub#108), isnotnull(mappedhub#109)]
            :                 +- InMemoryRelation [sourcehub#107, destinationhub#108, mappedhub#109], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            :                       +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2]
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, string, false]))
               +- *Filter (isnotnull(sourcehub#110) && isnotnull(destinationhub#111))
                  +- InMemoryTableScan [sourcehub#110, destinationhub#111], [isnotnull(sourcehub#110), isnotnull(destinationhub#111)]
                        +- InMemoryRelation [sourcehub#110, destinationhub#111, mappedhub#112], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                              +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2]

spark不应该缓存一次表而不是多次读取吗?如何在spark中高效地缓存这些自连接情况的表?
spark版本-2.2Hive兽人是下游的商店。

fcwjkofz

fcwjkofz1#

此语句序列忽略要缓存的Dataframe:

network_file.cache() #the result of this is not being used at all
network_file.createOrReplaceTempView("network_design") #doesn't have the cached DF in lineage

您应该覆盖变量或在返回的Dataframe上注册表:

network_file = network_file.cache()
network_file.createOrReplaceTempView("network_design")

或:

network_file.cache().createOrReplaceTempView("network_design")

相关问题