我试图做的是从我的事件中心读取一些数据,并将其保存在azure data lake中。然而,问题是,流不会停止,并且 writeStream 步骤未触发。我找不到任何设置来识别输入速率何时达到0以停止流。
writeStream
rnmwe5a21#
apache spark中有一个特殊的触发器,通常称为trigger.once,它将处理所有可用数据,然后关闭流。只需添加 .trigger(once=True) 之后 .writeStream 要启用它。唯一的问题是,在spark 3.x(dbr>=7.x)中,它完全忽略了以下选项 maxFilesPerTrigger ,等等,这限制了为处理而提取的数据量—在这种情况下,它将尝试一次性处理所有数据,有时可能会导致性能问题。要解决此问题,您可以执行以下操作-分配 raw_data.writeStream.....start() 喜欢 query = raw_data.writeStream.... 设置为一个变量,并定期检查 query.get('numInputRows') ,如果它在一段时间内等于0,则发出 query.stop()
.trigger(once=True)
.writeStream
maxFilesPerTrigger
raw_data.writeStream.....start()
query = raw_data.writeStream....
query.get('numInputRows')
query.stop()
1条答案
按热度按时间rnmwe5a21#
apache spark中有一个特殊的触发器,通常称为trigger.once,它将处理所有可用数据,然后关闭流。只需添加
.trigger(once=True)
之后.writeStream
要启用它。唯一的问题是,在spark 3.x(dbr>=7.x)中,它完全忽略了以下选项
maxFilesPerTrigger
,等等,这限制了为处理而提取的数据量—在这种情况下,它将尝试一次性处理所有数据,有时可能会导致性能问题。要解决此问题,您可以执行以下操作-分配raw_data.writeStream.....start()
喜欢query = raw_data.writeStream....
设置为一个变量,并定期检查query.get('numInputRows')
,如果它在一段时间内等于0,则发出query.stop()