我正在尝试编写一个spark结构的流式作业,它通过 writeStream
操作。但是,当我运行以下代码时,只有第一个 writeStream
被执行,第二个被忽略。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start() \
.awaitTermination()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start() \
.awaitTermination()
我最初认为我的问题与这篇文章有关,然而,在将我的代码更改为以下内容之后:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start()
write_one.awaitTermination()
write_two.awaitTermination()
我收到以下错误:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我不知道为什么 start()
以及 awaitTermination()
会导致上面的错误(但我认为这可能是一个单独的问题,这是在这个答案中提到的同一职位以上)。什么是调用多个 writeStream
同一工作中的操作?最好将这两个写操作都放在由调用的函数中 foreachBatch
还是有更好的方法来实现这一点?
2条答案
按热度按时间nhjlsmyf1#
你就是不打电话
awaiTermination()
对于每个流查询,但只有一个通过spark会话,例如:spark.streams.awaitAnyTermination()
huwehgph2#
spark文档说,如果您需要在多个位置执行写操作,您需要使用
foreachBatch
方法。您的代码应该如下所示:
注:
persist
为了防止重新计算,需要使用。您可以查看更多:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach和foreachbatch