提高spark.sql中的数据争用性能

6pp0gazn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(456)

我有一个包含几个csv文件的大数据库。每个csv文件包含最后10天,只有最早的日期是最终数据。
例如,“file_-08-11.csv”文件包含从08-02到08-11的数据(数据中只有日期为08-02的记录是最终记录)和“file_-08-12.csv”文件包含从08-03到08-12的数据(只有日期为08-03的记录是最终记录)。
我用Pypark来做这个。我的目标是只保存variables_2019-08-11.csv文件中日期08-02的记录和variables_2019-08-12.csv文件中日期08-03的记录等等。我正在使用pyspark和databricks来实现这一点,我的代码片段可以工作,但是有点慢,尽管我在足够大的集群上运行它。
我很乐意为其他场景提供建议,以提高其性能。谢谢

import datetime
    # define the period range
    start_date="2019-08-12"
    end_date="2019-08-30

# create list of dates under date_generated variable

    start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]

# read first file

    filename="file_variables_"+str(date_generated[0])[0:10]+".csv"
    df=spark.read.csv(data_path+filename,header="true")
    df.createOrReplaceTempView("df")

# create the main file which we will use the other dates to append below this one

    final=spark.sql("select * from df where data_date in (select min(data_date) from df)")

# loop on other dates than the first date

    for date in date_generated[1:len(date_generated)]:
      filename="file_variables_"+str(date)[0:10]+".csv"
      df=spark.read.csv(data_path+filename,header="true")
      df.createOrReplaceTempView("df")
      temp=spark.sql("select * from df where data_date in (select min(data_date) from df)")
      final=final.union(temp)
    final.createOrReplaceTempView("final")
mtb9vblg

mtb9vblg1#

我怀疑您的大集群上的大多数核心都是空闲的,因为根据您的代码在每个文件上使用循环的结构方式,您的作业正在处理一个文件,并且只使用集群中的一个核心。查看集群->[您的集群]->度量->[ganglia ui]
首先,最好将所有文件作为一个集合进行处理。使用 input_file_name() 如果逻辑依赖于输入文件名。在片场做你所有的工作。循环会扼杀你的表现。
第二,我认为一个窗口化的sql函数dense\u rank()将帮助您查找组[input\u file\u name()]中所有日期的第一个日期。下面是一个介绍窗口功能的博客:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

df=spark.read.csv(data_path)

from pyspark.sql.functions import input_file_name
df2 = df.withColumn('file_name',input_file_name())

final = df2.<apply logic>

相关问题