spark检查点导致连接问题

uklbhaso  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(255)

我有一段代码基本上完成了以下工作:

df=spark.read.parquet("very large dataset")
df.select(columns)
df.filter("some rows I dont want")

df2=df.groupBy('keys').agg("max of a column")
df=df.drop("columns that will be got from df2")
df=df.join(df2, on=["key cols"], "left")

spark.sparkContext.setCheckpointDir("checkpoint/path")
df3=df.checkpoint()

df4=df3.filter("condition 1").groupBy('key').agg("perform aggregations")
df5=df3.filter("condition 2").select(certain columns).alias(rename them)

df6=df4.join(df5, on=["key cols"], how="outer") #perform full outer join to get all columns and rows

此时,我得到以下错误:
运算符中工具id 27908、ll 27913、lw 27915、ul 27236、uw 27914、序列id 27907、结果27911、时间戳27909、日期27910缺少已解析属性ul 28099!项目[序列号#27907,工具号#27908,时间戳#27909,日期#27910,结果#27911,ul#28099,强制转换(大小写为空(ll#27913),然后-无限其他ll#27913结束为双精度)为ll#27246,uw#27914,lw#27915]。具有相同名称的属性出现在操作中:ul。请检查是否使用了正确的属性\njoin fullouter\n
但是,当我移除 checkpoint 像正常缓存的Dataframe一样运行它就可以了。如果我的数据集很小,这是可以的,但是我需要检查点,因为与可用的emr资源相比,我有一个非常大的数据集。
有没有人遇到过类似的问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题