在多列上使用explode函数后无法写入Dataframe

pcrecxhr  于 2021-07-24  发布在  Java
关注(0)|答案(0)|浏览(261)

我有一个pyspark方法,它对Dataframe上的每个数组列应用explode函数。

def explode_column(df, column):
    select_cols = list(df.columns)
    col_position = select_cols.index(column)
    select_cols[col_position] = explode_outer(column).alias(column)
    return df.select(select_cols)

def explode_all_arrays(df):
    still_has_arrays = True
    exploded_df = df

    while still_has_arrays:
        still_has_arrays = False
        for f in exploded_df.schema.fields:
            if isinstance(f.dataType, ArrayType):
                print(f"Exploding: {f}")
                still_has_arrays = True
                exploded_df = explode_column(exploded_df, f.name)

    return exploded_df

当我有少量的列要分解时,它可以很好地工作,但是在大的Dataframe上(大约200列,大约40次分解),在完成之后,Dataframe就不能写成Parquet了。
即使是少量数据也会失败(400kb),不是在方法处理过程中,而是在写入步骤中。
有什么建议吗?我试着把Dataframe写成表和Parquet文件。但当我将它存储为临时视图时,它就工作了。我说的“不能写”是指它一直在处理,什么也没发生。即使只有一个很小的文件,作业也要运行几个小时,我必须强制停止。
如果我将其存储为临时视图,我可以使用“select*from temp\u table”作为示例,但是当我尝试将结果写入永久表或Parquet文件时,它将永远保持处理。
有什么建议吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题