如何在pyspark的结构化流作业中运行map转换

dgtucam1  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(322)

我正在尝试使用map()转换设置一个结构化流式处理作业,该转换可以进行restapi调用。详情如下:

(1)
df=spark.readStream.format('delta') \
.option("maxFilesPerTrigger", 1000) \
.load(f'{file_location}') 

(2)
respData=df.select("resource", "payload").rdd.map(lambda row: put_resource(row[0], row[1])).collect()
respDf=spark.createDataFrame(respData, ["resource", "status_code", "reason"])

(3)
respDf.writeStream \
.trigger(once=True) \
.outputMode("append") \
.format("delta") \
.option("path", f'{file_location}/Response') \
.option("checkpointLocation", f'{file_location}/Response/Checkpoints') \
.start()

但是,我得到了一个错误:带有流源的查询必须在步骤(2)上使用writestream.start()执行。
任何帮助都将不胜感激。谢谢您。

uqxowvwt

uqxowvwt1#

您必须在df上执行流,也就是df.writestream.start()。。
这里有一个类似的线索:
具有流源的查询必须使用writestream.start()执行;

相关问题