我在用spark做结构化流媒体。我正在尝试将查询的输出保存到hadoop系统。但有一个难题:因为我使用的是aggregate,所以我必须将输出模式设置为“complete”,同时,filesink只适用于parquet。有解决办法吗?以下是我的代码:
val userSchema = new StructType().add("user1", "integer")
.add("user2", "integer")
.add("timestamp", "timestamp")
.add("interaction", "string")
val tweets = spark
.readStream
.option("sep", ",")
.schema(userSchema)
.csv(streaming_path)
val windowedCounts = tweets.filter("interaction='MT'")
.groupBy(
window($"timestamp", "10 seconds", "10 seconds")
).agg(GroupConcat($"user2")).sort(asc("window"))
val query = windowedCounts.writeStream
.format("parquet")
.option("path", "partb_q2")
.option("checkpointLocation", "checkpoint")
.start()
query.awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!