如何使spark流按顺序执行

hc8w905p  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(576)

发行

我有一个作业,总共执行两个流,但我希望最后一个在第一个流完成后启动,因为第一个流将readstream中的事件保存在deltatable中,作为第二个流的输入。问题是,第一个流中添加的内容在当前笔记本运行的第二个流中不可用,因为它们同时启动。
在同一个笔记本上运行命令时,有没有办法执行命令?
我试过了 awaitTermination 但发现这并不能解决我的问题。一些伪代码:

  1. def main():
  2. # Read eventhub
  3. metricbeat_df = spark \
  4. .readStream \
  5. .format("eventhubs") \
  6. .options(**eh_conf) \
  7. .load()
  8. # Save raw events
  9. metricbeat_df.writeStream \
  10. .trigger({"once": True}) \
  11. .format("delta") \
  12. .partitionBy("year", "month", "day") \
  13. .outputMode("append") \
  14. .option("checkpointLocation", "dbfs:/...") \
  15. .queryName("query1") \
  16. .table("my_db.raw_events")
  17. # Parse events
  18. metricbeat_df = spark.readStream \
  19. .format("delta") \
  20. .option("ignoreDeletes", True) \
  21. .table("my_db.raw_events")
  22. # *Do some transformations here*
  23. metricbeat_df.writeStream \
  24. .trigger({"once": True}) \
  25. .format("delta") \
  26. .partitionBy("year", "month", "day") \
  27. .outputMode("append") \
  28. .option("checkpointLocation", "dbfs:/...") \
  29. .queryName("query2") \
  30. .table("my_db.joined_bronze_events")

tldr公司

总结一下:当我运行上面的代码时, query1 以及 query2 同时开始导致 my_db.joined_bronze_events 有点落后 my_db.raw_events 因为在query1中添加的内容在当前运行的query2中不可用(当然它将在下一次运行中)。
有没有办法强制执行 query2 直到 query1 已完成,但仍在同一个笔记本中运行?

aor9mmx1

aor9mmx11#

当您使用该选项时 Trigger.once ,您可以使用 processAllAvailable 方法 StreamingQuery :

  1. def main():
  2. # Read eventhub
  3. # note that I have changed the variable name to metricbeat_df1
  4. metricbeat_df1 = spark \
  5. .readStream \
  6. .format("eventhubs") \
  7. .options(**eh_conf) \
  8. .load()
  9. # Save raw events
  10. metricbeat_df1.writeStream \
  11. .trigger({"once": True}) \
  12. .format("delta") \
  13. .partitionBy("year", "month", "day") \
  14. .outputMode("append") \
  15. .option("checkpointLocation", "dbfs:/...") \
  16. .queryName("query1") \
  17. .table("my_db.raw_events") \
  18. .processAllAvailable()
  19. # Parse events
  20. # note that I have changed the variable name to metricbeat_df2
  21. metricbeat_df2 = spark.readStream \
  22. .format("delta") \
  23. .option("ignoreDeletes", True) \
  24. .table("my_db.raw_events")
  25. # *Do some transformations here*
  26. metricbeat_df2.writeStream \
  27. .trigger({"once": True}) \
  28. .format("delta") \
  29. .partitionBy("year", "month", "day") \
  30. .outputMode("append") \
  31. .option("checkpointLocation", "dbfs:/...") \
  32. .queryName("query2") \
  33. .table("my_db.joined_bronze_events") \
  34. .processAllAvailable()

请注意,我已经更改了Dataframe名称,因为这两个流查询的Dataframe名称不应该相同。
可处理的方法描述如下:
“阻塞,直到源中的所有可用数据都已处理并提交到接收器。此方法用于测试。注意,在连续到达数据的情况下,此方法可能永远阻塞。此外,此方法仅保证在调用之前阻止已同步附加到org.apache.spark.sql.execution.streaming.source的数据(i、 e.getoffset必须立即反映添加的内容

展开查看全部

相关问题