pyspark 如何通过过滤器和分组方式将数据框架传递给不同的函数

mfuanj7w  于 2023-06-21  发布在  Spark
关注(0)|答案(3)|浏览(121)

我在pyspark中的一个查询中有一个 Dataframe 。但是我想把dataframe传递给pyspark中的两个不同的函数。
基本上,我正在做这样的事情,并希望避免运行相同的查询来创建initial_df两次。

def post_process_1(df):
       df = df_in_concern.filter(1st set of filter and group by)
       write_to_table_1(df)

def post_process_2(df):
       df = df_in_concern.filter(2nd set of filter and group by)
       write_to_table_2(df)

initial_df = df.filter(...).groupby(...).order1(...)
post_process_1(initial_df)
post_process_2(initial_df)

我发现这篇文章谈论的是一个类似于我的问题。pyspark python dataframe reuse in different functions
最后,建议使用createOrReplaceTempView。但据我所知,这个函数createOrReplaceTempView需要查询使用SQL语法。
在我的示例中,post_process_1post_process_2,查询(filter/ group by)是使用pyspark完成的。
有没有什么方法可以避免两次查询initial_df,并在post_process_1post_process_2函数中继续使用pyspark查询(而不是sql查询)?

332nm8kg

332nm8kg1#

这应该可以完成工作,因为df只在方法的上下文中被过滤,并且它不会改变initial_df上的任何内容:

def post_process_1(df):
    df_ = df.filter(1st set of filter and group by)
    write_to_table_1(df_)

def post_process_2(df):
    df_ = df.filter(2nd set of filter and group by)
    write_to_table_2(df_)

initial_df = df.filter(...).groupby(...).order1(...)
post_process_1(initial_df)
post_process_2(initial_df)
dhxwm5r4

dhxwm5r42#

使用persist/cache(如果使用缓存,则需要指定所需的存储级别)。这是为了防止重新计算initial_df

def post_process_1(df):
      df = df_in_concern.filter(1st set of filter and group by)
      write_to_table_1(df)

def post_process_2(df):
      df = df_in_concern.filter(2nd set of filter and group by)
      write_to_table_2(df)

initial_df = df.filter(...).groupby(...).order1(...)
initial_df.persist()
post_process_1(initial_df)
post_process_2(initial_df)
vm0i2vca

vm0i2vca3#

这个用例的解决方案是在调用方法之前使用persist将DataFrame保存到存储级别。
persist()方法用于将DataFrame存储到以下存储级别之一:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK_2等。
存储级别的选择应根据群集配置(内存和磁盘大小)来完成
您可以使用cache(),它是**persist(StorageLevel.MEMORY_AND_DISK)**的别名
以下是可用的不同存储级别:

MEMORY_ONLY-这是RDD cache()方法的默认行为,并将RDD或DataFrame作为反序列化对象存储到JVM内存中。当没有足够的内存可用时,它将不会保存某些分区的DataFrame,并且这些分区将在需要时重新计算。这需要更多的内存。但与RDD不同的是,这将比MEMORY_AND_DISK级别慢,因为它重新计算未保存的分区,并且重新计算底层表的内存中列表示的开销很大
MEMORY_ONLY_SER-这与MEMORY_ONLY相同,但区别在于它将RDD作为序列化对象存储到JVM内存中。它比MEMORY_ONLY占用更少的内存(空间效率),因为它将对象保存为序列化的,并且需要额外的几个CPU周期来反序列化。
MEMORY_ONLY_2-与MEMORY_ONLY存储级别相同,但将每个分区复制到两个群集节点。
MEMORY_ONLY_SER_2-与MEMORY_ONLY_SER存储级别相同,但将每个分区复制到两个群集节点。
MEMORY_AND_DISK-这是DataFrame或Dataset的默认行为。在此存储级别中,DataFrame将作为反序列化对象存储在JVM内存中。当所需存储大于可用内存时,它会将一些多余的分区存储到磁盘中,并在需要时从磁盘读取数据。因为涉及I/O,所以速度较慢。
MEMORY_AND_DISK_SER-这与MEMORY_AND_DISK存储级别差异相同,因为它在空间不可用时序列化内存和磁盘上的DataFrame对象。
MEMORY_AND_DISK_2-与MEMORY_AND_DISK存储级别相同,但将每个分区复制到两个群集节点。
MEMORY_AND_DISK_SER_2-与MEMORY_AND_DISK_SER存储级别相同,但将每个分区复制到两个群集节点。
DISK_ONLY-在此存储级别中,DataFrame仅存储在磁盘上,由于涉及I/O,CPU计算时间很长。
DISK_ONLY_2-与DISK_ONLY存储级别相同,但将每个分区复制到两个群集节点。
注意:在Python中,存储的对象将始终使用Pickle库进行序列化,因此您是否选择序列化级别并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2和DISK_ONLY_3。

Spark official doc

相关问题