使用聚合将输出保存到spark streaming中的hadoop

6pp0gazn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(233)

我在用spark做结构化流媒体。我正在尝试将查询的输出保存到hadoop系统。但有一个难题:因为我使用的是aggregate,所以我必须将输出模式设置为“complete”,同时,filesink只适用于parquet。有解决办法吗?以下是我的代码:

val userSchema = new StructType().add("user1", "integer")
    .add("user2", "integer")
    .add("timestamp", "timestamp")
    .add("interaction", "string")

val tweets = spark
      .readStream
      .option("sep", ",")
      .schema(userSchema)
      .csv(streaming_path)

val windowedCounts = tweets.filter("interaction='MT'")
      .groupBy(
        window($"timestamp", "10 seconds", "10 seconds")
      ).agg(GroupConcat($"user2")).sort(asc("window"))

val query = windowedCounts.writeStream
  .format("parquet")
  .option("path", "partb_q2")
  .option("checkpointLocation", "checkpoint")
  .start()
query.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题