我希望能够简洁地指定添加到列表中的列。
列表理解的基本结构非常简单,如下所示,其中spark\u sql\u df是spark dataframe,column是dataframe中列的实际名称。
def return_list():
# creation and munging of spark_sql_df
my_list = [int(row.column) for row in spark_sql_df.collect()]
return my_list
但是,由于我需要搜索多个Dataframe,然后比较结果,以显示丢失的数据,所以我希望有一种方法能够在列之间循环,这样就不必多次复制此代码。
很简单,我想,我会用 col
函数来自 pyspark.sql.functions
把它绕在我的新变量上,像平常一样继续:
from pyspark.sql.functions import col
def return_list(column):
# creation and munging of spark_sql_df
my_list = [int(row.col(column)) for row in spark_sql_df.collect()]
return my_list
尝试此操作时,程序将引发以下错误:
Traceback (most recent call last):
File "C:\Users\<my_username>\Spark\python\pyspark\sql\types.py", line 1527, in __getattr__
idx = self.__fields__.index(item)
ValueError: 'col' is not in list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "microservice_junifer_compare_keys.py", line 107, in <module>
my_list = general_junifer_key_lister(index)
File "microservice_junifer_compare_keys.py", line 78, in general_junifer_key_lister
for row in spark_sql_df.collect()]
File "microservice_junifer_compare_keys.py", line 78, in <listcomp>
for row in spark_sql_df.collect()]
File "C:\Users\<my_username>\Spark\python\pyspark\sql\types.py", line 1532, in __getattr__
raise AttributeError(item)
AttributeError: col
能否以一种不会导致笨拙的if/else语句的方式来解决这个问题?
1条答案
按热度按时间noj0wjuj1#
我请求您考虑除收集数据外的任何其他方法来处理您的数据。因为这会将所有的数据带给驱动程序,而您不能使用spark的任何酷特性。所以你所需要的可以用spark内置函数或者在最坏的情况下spark-udf或者pandas-udf来完成。
为了帮助您使用这种方法,请尝试以下方法:
因为一旦你收集,它就是一个行元素数组。您不需要col函数,该函数用于直接处理sparkDataframe。