我有一个pyspark流作业,它从s3(使用 textFileStream
). 每一行输入都被解析并输出为hdfs上的parquet格式。
这在正常情况下非常有效。但是,当出现以下错误情况之一时,我有什么样的选项来恢复丢失的数据批?
调用中的驱动程序发生异常 foreachRDD
,其中发生输出操作(可能是 HdfsError
,或在诸如partitionby或 dataframe.write.parquet()
). 据我所知,这在spark(vs.“transformation”)中被归类为“动作”。
异常发生在执行器中,可能是因为解析行时map()lambda中发生异常。
我正在建立的系统必须是一个记录系统。我的所有输出语义都符合spark streaming文档中的一次输出语义(如果必须重新计算批处理/rdd,则输出数据将被覆盖,而不是重复)。
如何处理输出操作中的失败(内部) foreachRDD
)? afaict,内部发生的异常 foreachRDD
不要导致流作业停止。实际上,我已经尝试确定如何在内部生成未处理的异常 foreachRDD
停止工作,并一直无法做到这一点。
假设驱动程序中发生未处理的异常。如果需要更改代码来解决异常,我的理解是需要在恢复之前删除检查点。在这个场景中,是否有方法从流作业停止的时间戳开始流作业?
1条答案
按热度按时间iovurdzv1#
一般来说,函数中抛出的每个异常都像操作一样传递给mappartitions(
map
,filter
,flatMap
)应该可以恢复。对于一个格式错误的输入,整个操作/转换失败是没有充分理由的。具体策略将取决于您的需求(忽略、记录、保留以供进一步处理)。您可以在pyspark中找到与scala.util.try等效的方法?处理操作范围内的故障肯定更难。由于一般情况下,由于传入的通信量,它可能无法恢复或等待不是一个选项,因此我会乐观地重试,以防失败,如果失败,则将原始数据推送到外部备份系统(例如s3)。