last_query = org_query.groupBy(window("timestamp", "1 minute"), "country").agg(*expr).orderBy(asc("window"))
output = last_query\
.writeStream\
.outputMode("complete")\
.format("console")\
.option("truncate", "False")\
.start()
output.awaitTermination()
上面代码中的最后一个查询是spark流式Dataframe,并尝试在控制台中显示它—它工作正常,没有任何问题。然而,当我试图保存到json文件时,问题来了。。。所以,我修改代码如下。
output = last_query\
.writeStream\
.outputMode("complete")\
.format("json")\
.option("path", "/home/ec2-user/kafka-project/op2/")\
.option("checkpointLocation", "/home/ec2-user/kafka-project/checkpoint/")\
.start()
错误消息:pyspark.sql.utils.analysisexception:数据源json不支持完全输出模式;
嗯,我无法将outputmode更改为append/update,因为我正在使用聚合方法。。即使我换成更新模式,我也会收到同样的错误。。。我的方法正确吗?
暂无答案!
目前还没有任何答案,快来回答吧!