我应该如何配置spark来正确修剪配置单元元存储分区?

9rbhqvlz  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(674)

我在将分区过滤器应用于spark(v2.0.2/2.1.1)Dataframe时遇到了一个问题,spark(v2.0.2/2.1.1)Dataframe从具有30000多个分区的hive(v2.1.0)表中读取数据。我想知道推荐的方法是什么,如果有的话,我做得不对,因为当前的行为是一个大的性能和可靠性问题的来源。
要启用修剪,我将使用以下spark/hive属性:

--conf spark.sql.hive.metastorePartitionPruning=true

在sparkshell中运行查询时,我可以看到通过调用 ThriftHiveMetastore.Iface.get_partitions ,但这在没有任何筛选的情况下意外发生:

val myTable = spark.table("db.table")
val myTableData = myTable
  .filter("local_date = '2017-09-01' or local_date = '2017-09-02'")
  .cache

// The HMS call invoked is:
// #get_partitions('db', 'table', -1)

如果我使用更简单的过滤器,分区会根据需要进行过滤:

val myTableData = myTable
  .filter("local_date = '2017-09-01'")
  .cache

// The HMS call invoked is:
// #get_partitions_by_filter(
//   'db', 'table',
//   'local_date = "2017-09-01"',
//   -1
// )

如果我重写过滤器以使用范围运算符而不是简单地检查相等性,则过滤也可以正常工作:

val myTableData = myTable
  .filter("local_date >= '2017-09-01' and local_date <= '2017-09-02'")
  .cache

// The HMS call invoked is:
// #get_partitions_by_filter(
//   'db', 'table',
//   'local_date >= '2017-09-01' and local_date <= '2017-09-02'',
//   -1
// )

在我们的例子中,从性能的Angular 来看,这种行为是有问题的;正确过滤后,通话时间为4分钟,而不是1秒。此外,常规装载大量 Partition 每次查询都将对象放在堆上,最终会导致metastore服务中的内存问题。
似乎在解析和解释某些类型的过滤器结构时有一个bug,但是我还没有在spark jira中找到相关的问题。是否有一个优先的方法或特定的Spark版本,过滤器适用于所有的过滤器变种?或者在构造过滤器时必须使用特定的形式(例如范围运算符)?如果是这样的话,这个限制是否在任何地方都有记录?

1bqhqjot

1bqhqjot1#

我还没有找到一个首选的查询方式,除了重写过滤器在我的(操作)问题中所描述的。我确实发现spark改进了对此的支持,看起来spark2.3.0中已经解决了我的问题。这是解决我发现的问题的罚单:spark-20331

相关问题