spark读取分区avro明显慢于指向精确位置

jxct1oxe  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(384)

我正在尝试读取基于年、月和日进行分区的avro数据,这似乎比直接指向路径要慢得多。在物理计划中,我可以看到分区过滤器正在被传递,因此它不会扫描整个目录集,但仍然非常慢。
e、 g.像这样读取分区数据

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/"

profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)

profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt \
                             where Year= " + year + " and Month=" + month_nz + " and Day=" + date_nz )

大约需要3分钟
而我用一个字符串生成器指向确切的位置,只需2秒钟

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/Year=" +year +"/Month=" + month_nz + "/Day=" + date_nz

profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)

profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt "
                              )

display(df)

查看第一个(慢一点)的物理计划确实表明分区过滤器已被传递
什么能解释发现阶段花了这么长时间?
任何问题我都可以详细说明。

xpszyzbs

xpszyzbs1#

好吧,之所以这么慢是因为inmemoryfileindex的构建。
虽然进行了分区修剪,但spark需要知道分区和文件信息,这正是它需要的步骤。这篇s.o的文章详细阐述了它:这里
因此,当时的想法是创建一个外部表,以便构建这些信息,我使用了这样的脚本(我使用了一个内联模式,如果您有一个模式文件,您可以使用一个模式文件)

create external table ProfitLossAvro 

partitioned by (Year int, Month int, Day int)

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'

Stored As 

 inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'

 outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

Location 'abfss://raw@datalakename.dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/'

TBLPROPERTIES (
    'avro.schema.literal'='{
      "name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
      "type": "record",
      "fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
    }');

但是如果查询这个表,将得到0行。这是因为现有分区不是自动添加的。所以,你可以用

msck repair table ProfitLossAvro

每次向datalake添加数据时,都可以执行添加分区。像这样的this:-

ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)

如果使用下面这样的命令查询数据,它的运行速度会更快

df=sqlContext.sql("select * \
               from ProfitLossAvro \
               where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)

display(df)

相关问题