如何在一个spark作业中调用多个writestream操作?

mfuanj7w  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(530)

我正在尝试编写一个spark结构的流式作业,它通过 writeStream 操作。但是,当我运行以下代码时,只有第一个 writeStream 被执行,第二个被忽略。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() \
  .awaitTermination()

// transform df to df2

write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start() \
  .awaitTermination()

我最初认为我的问题与这篇文章有关,然而,在将我的代码更改为以下内容之后:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() 

// transform df to df2 

write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start()

write_one.awaitTermination()
write_two.awaitTermination()

我收到以下错误:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我不知道为什么 start() 以及 awaitTermination() 会导致上面的错误(但我认为这可能是一个单独的问题,这是在这个答案中提到的同一职位以上)。什么是调用多个 writeStream 同一工作中的操作?最好将这两个写操作都放在由调用的函数中 foreachBatch 还是有更好的方法来实现这一点?

nhjlsmyf

nhjlsmyf1#

你就是不打电话 awaiTermination() 对于每个流查询,但只有一个通过spark会话,例如: spark.streams.awaitAnyTermination()

huwehgph

huwehgph2#

spark文档说,如果您需要在多个位置执行写操作,您需要使用 foreachBatch 方法。
您的代码应该如下所示:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

注: persist 为了防止重新计算,需要使用。
您可以查看更多:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach和foreachbatch

相关问题