如何使spark流按顺序执行

hc8w905p  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(503)

发行

我有一个作业,总共执行两个流,但我希望最后一个在第一个流完成后启动,因为第一个流将readstream中的事件保存在deltatable中,作为第二个流的输入。问题是,第一个流中添加的内容在当前笔记本运行的第二个流中不可用,因为它们同时启动。
在同一个笔记本上运行命令时,有没有办法执行命令?
我试过了 awaitTermination 但发现这并不能解决我的问题。一些伪代码:

def main():
    # Read eventhub
    metricbeat_df = spark \
        .readStream \
        .format("eventhubs") \
        .options(**eh_conf) \
        .load()

    # Save raw events
    metricbeat_df.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query1") \
        .table("my_db.raw_events")

    # Parse events
    metricbeat_df = spark.readStream \
        .format("delta") \
        .option("ignoreDeletes", True) \
        .table("my_db.raw_events")

    # *Do some transformations here*

    metricbeat_df.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query2") \
        .table("my_db.joined_bronze_events")

tldr公司

总结一下:当我运行上面的代码时, query1 以及 query2 同时开始导致 my_db.joined_bronze_events 有点落后 my_db.raw_events 因为在query1中添加的内容在当前运行的query2中不可用(当然它将在下一次运行中)。
有没有办法强制执行 query2 直到 query1 已完成,但仍在同一个笔记本中运行?

aor9mmx1

aor9mmx11#

当您使用该选项时 Trigger.once ,您可以使用 processAllAvailable 方法 StreamingQuery :

def main():
    # Read eventhub
    # note that I have changed the variable name to metricbeat_df1
    metricbeat_df1 = spark \
        .readStream \
        .format("eventhubs") \
        .options(**eh_conf) \
        .load()

    # Save raw events
    metricbeat_df1.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query1") \
        .table("my_db.raw_events") \
        .processAllAvailable()

    # Parse events
    # note that I have changed the variable name to metricbeat_df2
    metricbeat_df2 = spark.readStream \
        .format("delta") \
        .option("ignoreDeletes", True) \
        .table("my_db.raw_events")

    # *Do some transformations here*

    metricbeat_df2.writeStream \
        .trigger({"once": True}) \
        .format("delta") \
        .partitionBy("year", "month", "day") \
        .outputMode("append") \
        .option("checkpointLocation", "dbfs:/...") \
        .queryName("query2") \
        .table("my_db.joined_bronze_events") \
        .processAllAvailable()

请注意,我已经更改了Dataframe名称,因为这两个流查询的Dataframe名称不应该相同。
可处理的方法描述如下:
“阻塞,直到源中的所有可用数据都已处理并提交到接收器。此方法用于测试。注意,在连续到达数据的情况下,此方法可能永远阻塞。此外,此方法仅保证在调用之前阻止已同步附加到org.apache.spark.sql.execution.streaming.source的数据(i、 e.getoffset必须立即反映添加的内容

相关问题