pyspark Spark stream leftOuter不支持第三流连接

svmlkihl  于 11个月前  发布在  Spark
关注(0)|答案(2)|浏览(89)

我被这个流leftOuter连接卡住了。我能够流连接2个 Dataframe ,并且可以在水印时间到期后获得空值。但是如果我连接3个 Dataframe ,我就无法实现。比方说,我从Kafka读取了3个 Dataframe (会话,比较,过滤器)。我的期望是-水印过期后,如果没有数据进入比较或过滤流,它应该给予会话的值。但是由于某种原因,会话正在丢失,即使在我等了10多分钟之后,我也无法在控制台上看到这个会话!
我无法得到任何博客与人加入3数据流.请帮助我找到问题,或者有限制Spark结构化流3或更多的流 Dataframe 加入?让我知道如果你遇到任何类似的问题!
下面是我正在尝试的代码,

watermark_duration = "5 minutes" for all 3 streams
interval_value = "5 minutes"

session_df = spark.readStream.format("kafka").options(**options).option("subscribe", session_source_topic).load() 

session_df = session_df.selectExpr("CAST(value AS STRING) as session_raw_data", "Timestamp as session_Timestamp").withColumn("session_data", split("session_raw_data", "\\|")).selectExpr("session_data", "session_Timestamp")

session_df = session_df.select(col("session_data")[0].alias("id"),\
                col("session_data")[1].alias("session_id"),\
                col("session_Timestamp")).withWatermark("session_Timestamp", watermark_duration)


comparison_df = spark.readStream.format("kafka").options(**options).option("subscribe", comparison_source_topic).load() 
comparison_df = comparison_df.selectExpr("CAST(value AS STRING) as comparison_raw_data", "Timestamp as comparison_Timestamp").withColumn("comparison_data", split("comparison_raw_data", "\\|")).selectExpr("comparison_data", "comparison_Timestamp")

comparison_df = comparison_df.select(col("comparison_data")[0].alias("comparison_id"),\
                                   col("comparison_data")[1].alias("comparison_session_id"),\
                                   col("comparison_Timestamp")).withWatermark("watermark_duration", comparison_watermark)

filter_df = spark.readStream.format("kafka").options(**options).option("subscribe", filter_source_topic).load()
filter_df = filter_df.selectExpr("CAST(value AS STRING) as filter_raw_data", "Timestamp as filter_Timestamp_Timestamp").withColumn("filter_data", split("filter_raw_data", "\\|")).selectExpr("filter_data", "filter_Timestamp_Timestamp")

filter_df = filter_df.select(col("filter_data")[0].alias("filter_id"),\
                            col("filter_data")[1].alias("filter_session_id"),\                                col("filter_Timestamp_Timestamp")).withWatermark("filter_Timestamp_Timestamp", watermark_duration)

comparison_join_expr = "session_id=comparison_session_id AND " + \
        " comparison_Timestamp between session_Timestamp  and session_Timestamp + interval " + interval_value
filter_join_expr = "session_id=filter_session_id AND " + \
        " filter_Timestamp_Timestamp between session_Timestamp  and session_Timestamp + interval " + interval_value 

joined_df1 = session_df.join(comparison_df, expr(comparison_join_expr), "leftOuter")\
            .join(filter_df, expr(filter_join_expr), "leftOuter").drop("filter_session_id")  

# For testing
session_df = session_df.writeStream.outputMode("append").format("console").start()
comparison_df = comparison_df.writeStream.outputMode("append").format("console").start()
filter_df = filter_df.writeStream.outputMode("append").format("console").start()
joined_df1 = joined_df1.writeStream.outputMode("append").format("console").start()
joined_df2 = joined_df2.writeStream.outputMode("append").format("console").start()
joined_df3 = joined_df3.writeStream.outputMode("append").format("console").start()
spark.streams.awaitAnyTermination()

字符串
我的期望是,如果数据在comparison_df和filter_df中不可用,它应该打印session_df + comparison_df的null值+filter_df的null值。还添加了图像!
Expected output

wtlkbnrh

wtlkbnrh1#

目前,结构化流仅支持每个管道一个有状态运算符。结构化流中尚不支持多个有状态运算符。请参阅此JIRA链接https://issues.apache.org/jira/browse/SPARK-39585
有两种解决方案可以解决这个问题:
1.将连接移动到foreachbatch内部(在连接条件中增加批窗口和间隔)。虽然这将成为无状态解决方案,但外部连接应该为右表给予NULL
1.将缓慢移动的右流移动到静态存储(例如Hudi)并执行链式连接。这将再次成为无状态解决方案,但将解决问题
Ankur

mmvthczy

mmvthczy2#

我能够实现的输出与方法-1。附上截图供您参考
Ankur

相关问题