python—如何指定要添加到列表中的Dataframe的列

o7jaxewo  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(578)

我希望能够简洁地指定添加到列表中的列。
列表理解的基本结构非常简单,如下所示,其中spark\u sql\u df是spark dataframe,column是dataframe中列的实际名称。

def return_list():

    # creation and munging of spark_sql_df

    my_list = [int(row.column) for row in spark_sql_df.collect()]

    return my_list

但是,由于我需要搜索多个Dataframe,然后比较结果,以显示丢失的数据,所以我希望有一种方法能够在列之间循环,这样就不必多次复制此代码。
很简单,我想,我会用 col 函数来自 pyspark.sql.functions 把它绕在我的新变量上,像平常一样继续:

from pyspark.sql.functions import col

def return_list(column):

    # creation and munging of spark_sql_df

    my_list = [int(row.col(column)) for row in spark_sql_df.collect()]

    return my_list

尝试此操作时,程序将引发以下错误:

Traceback (most recent call last):
  File "C:\Users\<my_username>\Spark\python\pyspark\sql\types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'col' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "microservice_junifer_compare_keys.py", line 107, in <module>
    my_list = general_junifer_key_lister(index)
  File "microservice_junifer_compare_keys.py", line 78, in general_junifer_key_lister
    for row in spark_sql_df.collect()]
  File "microservice_junifer_compare_keys.py", line 78, in <listcomp>
    for row in spark_sql_df.collect()]
  File "C:\Users\<my_username>\Spark\python\pyspark\sql\types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: col

能否以一种不会导致笨拙的if/else语句的方式来解决这个问题?

noj0wjuj

noj0wjuj1#

我请求您考虑除收集数据外的任何其他方法来处理您的数据。因为这会将所有的数据带给驱动程序,而您不能使用spark的任何酷特性。所以你所需要的可以用spark内置函数或者在最坏的情况下spark-udf或者pandas-udf来完成。
为了帮助您使用这种方法,请尝试以下方法:

tst2= sqlContext.createDataFrame([('netflix','yahoo',1),('amazon','yahoo',2),('flipkart',None,2)],schema=("asset_domain","asset",'xtra'))
def return_list(column):
    my_list=[int(row[column]) for row in tst2.collect()]
    return(my_list)
res =return_list('xtra')

因为一旦你收集,它就是一个行元素数组。您不需要col函数,该函数用于直接处理sparkDataframe。

相关问题