spark结构化流从查询异常中恢复

l2osamch  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(644)

是否可以从查询执行期间抛出的异常中自动恢复?
上下文:我正在开发一个spark应用程序,它从kafka主题中读取数据,处理数据,并输出到s3。但是,在生产环境中运行了几天之后,spark应用程序会遇到一些来自s3的网络故障,导致抛出异常并停止应用程序。值得一提的是,这个应用程序在kubernetes上运行时使用了gcp的sparkk8s操作符。
从我到目前为止看到的情况来看,这些异常很小,只需重新启动应用程序就可以解决问题。我们能否处理这些异常并自动重新启动结构化流式查询?
以下是引发异常的示例:

  1. Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
  2. === Streaming Query ===
  3. Identifier: ...
  4. Current Committed Offsets: ...
  5. Current Available Offsets: ...
  6. Current State: ACTIVE
  7. Thread State: RUNNABLE
  8. Logical Plan: ...
  9. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
  10. at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
  11. Caused by: org.apache.spark.SparkException: Job aborted.
  12. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
  13. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
  14. at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  15. at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  16. at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  17. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  18. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  19. at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  20. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  21. at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  22. at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  23. at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  24. at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  25. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  26. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  27. at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  28. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  29. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  30. at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  31. at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  32. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  33. at io.blahblahView$$anonfun$11$$anonfun$apply$2.apply(View.scala:90)
  34. at io.blahblahView $$anonfun$11$$anonfun$apply$2.apply(View.scala:82)
  35. at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
  36. at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  37. at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  38. at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
  39. at io.blahblahView$$anonfun$11.apply(View.scala:82)
  40. at io.blahblahView$$anonfun$11.apply(View.scala:79)
  41. at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
  42. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
  43. at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  44. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  45. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  46. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
  47. at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
  48. at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  49. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
  50. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
  51. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
  52. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
  53. at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
  54. at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  55. at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
  56. at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
  57. at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
  58. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
  59. ... 1 more
  60. Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
  61. at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
  62. at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
  63. at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
  64. at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
  65. at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
  66. at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
  67. at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
  68. at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
  69. at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
  70. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
  71. ... 47 more

自动处理这些问题的最简单方法是什么?

ars1skjm

ars1skjm1#

不,没有可靠的方法可以做到这一点。顺便说一句,不也是一个答案。
检查异常的逻辑通常是通过在驱动程序上运行try/catch来实现的。
由于对于结构化流,spark框架本身已经标准地处理了executor级别的意外情况,并且如果错误是不可恢复的,那么应用程序/作业只是在将错误信号发送回驱动程序后崩溃,除非您在各种foreach构造中编写try/catch代码。
也就是说,对于foreach构造来说,不清楚微批次是否可以在这种方法中恢复,因为微批次的某些部分很可能丢失。但很难测试。
既然spark标准地迎合了您无法钩住的东西,那么为什么可以在程序源代码中插入循环或try/catch呢?同样地,广播变量也是一个问题——尽管有些人有相关的技术。但这不符合框架的精神。
所以,好问题,因为我想知道(在过去)。

omjgkv6w

omjgkv6w2#

在花了太多时间试图找到一个优雅的解决方案,但什么也没找到之后,下面是我的想法。
有人可能会说这是一个黑客,但它很简单,它的工作和解决了一个复杂的问题。我在生产环境中对它进行了测试,它解决了由于偶尔出现的小异常而导致的故障自动恢复问题。
我称之为查询看门狗。下面是看门狗将无限期重试运行查询的最简单版本:

  1. val writer = df.writeStream...
  2. while (true) {
  3. val query = writer.start()
  4. try {
  5. query.awaitTermination()
  6. }
  7. catch {
  8. case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
  9. }
  10. }

有些人可能想替换 while(true) 使用某种计数器来限制重试次数。有人还可以补充这段代码,并在重试时通过slack或电子邮件发送通知。其他人可以简单地收集普罗米修斯的重试次数。
希望有帮助,
干杯

展开查看全部

相关问题