发行
我有一个作业,总共执行两个流,但我希望最后一个在第一个流完成后启动,因为第一个流将readstream中的事件保存在deltatable中,作为第二个流的输入。问题是,第一个流中添加的内容在当前笔记本运行的第二个流中不可用,因为它们同时启动。
在同一个笔记本上运行命令时,有没有办法执行命令?
我试过了 awaitTermination
但发现这并不能解决我的问题。一些伪代码:
def main():
# Read eventhub
metricbeat_df = spark \
.readStream \
.format("eventhubs") \
.options(**eh_conf) \
.load()
# Save raw events
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query1") \
.table("my_db.raw_events")
# Parse events
metricbeat_df = spark.readStream \
.format("delta") \
.option("ignoreDeletes", True) \
.table("my_db.raw_events")
# *Do some transformations here*
metricbeat_df.writeStream \
.trigger({"once": True}) \
.format("delta") \
.partitionBy("year", "month", "day") \
.outputMode("append") \
.option("checkpointLocation", "dbfs:/...") \
.queryName("query2") \
.table("my_db.joined_bronze_events")
tldr公司
总结一下:当我运行上面的代码时, query1
以及 query2
同时开始导致 my_db.joined_bronze_events
有点落后 my_db.raw_events
因为在query1中添加的内容在当前运行的query2中不可用(当然它将在下一次运行中)。
有没有办法强制执行 query2
直到 query1
已完成,但仍在同一个笔记本中运行?
1条答案
按热度按时间aor9mmx11#
当您使用该选项时
Trigger.once
,您可以使用processAllAvailable
方法StreamingQuery
:请注意,我已经更改了Dataframe名称,因为这两个流查询的Dataframe名称不应该相同。
可处理的方法描述如下:
“阻塞,直到源中的所有可用数据都已处理并提交到接收器。此方法用于测试。注意,在连续到达数据的情况下,此方法可能永远阻塞。此外,此方法仅保证在调用之前阻止已同步附加到org.apache.spark.sql.execution.streaming.source的数据(i、 e.getoffset必须立即反映添加的内容