如何优化PySpark代码以获得更好的性能

xj3cbfub  于 2023-04-12  发布在  Apache
关注(0)|答案(2)|浏览(199)

我试图获取当表(增量表)最后优化使用下面的代码和获得预期的输出.此代码将为所有的表是目前在数据库中.

table_name_or_path = "abcd"

df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc())
if len(df.take(1)) != 0:
    last_optimize = df.select(col("timestamp").cast("string").alias("timestamp")).first().asDict()
    print(last_optimize["timestamp"])
    last_optimize = last_optimize["timestamp"]
else:
    last_optimize = ""

上面的代码将花费一些时间,它将触发大量的Spark作业。
我想优化上面的代码以获得更好的性能。
有没有什么方法来编写优化的代码,这将是更有帮助的。

w8f9ii69

w8f9ii691#

最好避免像if len(df.take(1)) != 0这样的检查,因为它可能会导致在您稍后执行.first()时重新计算结果。相反,只需使用.limit(1)限制行数,并检查收集项的结果。

table_name_or_path = "abcd"

df = spark.sql(f"desc history {table_name_or_path}") \
  .select("operation","timestamp") \
  .filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()) \
  .limit(1)

data = df.collect()
if len(data) > 0:
    last_optimize = data[0].asDict()
    print(last_optimize["timestamp"])
    last_optimize = last_optimize["timestamp"]
else:
    last_optimize = ""
8xiog9wr

8xiog9wr2#

通常,在开始对数据框进行任何计算之前缓存数据框通常会有所帮助

df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()).cache()

我假设这里缓存orderBy步骤已经减少了计算工作量

相关问题