连接流错误:pyspark.sql.utils.analysisexception:“流Dataframe/数据集不支持多个流聚合

lxkprmvk  于 2021-05-22  发布在  Spark
关注(0)|答案(0)|浏览(373)

我正在使用窗口函数 groupby 以及聚集流式Dataframe的列。这为窗口大小10提供了一个基于移动窗口的平均值。同样,对12号窗口也要这样做。
因此,我得到了两个移动平均线。我想根据时间戳的结束时间将这两个移动平均流连接起来。
我不断地犯错误,

pyspark.sql.utils.AnalysisException: 'Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;\nJoin Inner,

我想知道是否有解决办法。我参考了下面的链接,但没有帮助,链接。
我的代码:

df_streaming_data= df_streaming_data.withColumn('timestamp_alt',df_streaming_data.timestamp_alt.cast('timestamp'))
w10 = fn.window('timestamp_alt', '10 seconds', '1 seconds')
df_streaming_data_w10 = df_streaming_data.withWatermark('timestamp_alt', '1 minutes') .groupBy(w10).agg(fn.mean('ltp').alias('ma10'))
df_streaming_data_w10 = df_streaming_data_w10.select(df_streaming_data_w10.window.start.cast('timestamp').alias("startma10"), df_streaming_data_w10.window.end.cast('timestamp').alias("endma10"), "ma10")
w12 = fn.window('timestamp_alt', '12 seconds', '1 seconds')
df_streaming_data_w12 = df_streaming_data.withWatermark('timestamp_alt', '1 minutes') .groupBy(w12).agg(fn.mean('ltp').alias('ma12'))
df_streaming_data_w12 = df_streaming_data_w12.select(df_streaming_data_w12.window.start.cast('timestamp').alias("startma12"), df_streaming_data_w12.window.end.cast('timestamp').alias("endma12"), "ma12")

# the code above this line works very well. So we have two moving average data frames ready.

# the following code where we have the join is giving the issue.

df_streaming_filtered = df_streaming_data_w10.join(df_streaming_data_w12,expr("""endma12 = endma10"""))
query = df_streaming_filtered.writeStream.outputMode("append").format("console") .option("truncate", 0).start()

在Python3.7中使用spark 2.3.x。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题