我有一个pyspark方法,它对Dataframe上的每个数组列应用explode函数。
def explode_column(df, column):
select_cols = list(df.columns)
col_position = select_cols.index(column)
select_cols[col_position] = explode_outer(column).alias(column)
return df.select(select_cols)
def explode_all_arrays(df):
still_has_arrays = True
exploded_df = df
while still_has_arrays:
still_has_arrays = False
for f in exploded_df.schema.fields:
if isinstance(f.dataType, ArrayType):
print(f"Exploding: {f}")
still_has_arrays = True
exploded_df = explode_column(exploded_df, f.name)
return exploded_df
当我有少量的列要分解时,它可以很好地工作,但是在大的Dataframe上(大约200列,大约40次分解),在完成之后,Dataframe就不能写成Parquet了。
即使是少量数据也会失败(400kb),不是在方法处理过程中,而是在写入步骤中。
有什么建议吗?我试着把Dataframe写成表和Parquet文件。但当我将它存储为临时视图时,它就工作了。我说的“不能写”是指它一直在处理,什么也没发生。即使只有一个很小的文件,作业也要运行几个小时,我必须强制停止。
如果我将其存储为临时视图,我可以使用“select*from temp\u table”作为示例,但是当我尝试将结果写入永久表或Parquet文件时,它将永远保持处理。
有什么建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!