spark驱动程序中可能的竞争条件

icnyk63a  于 2021-07-09  发布在  Java
关注(0)|答案(2)|浏览(487)

我有一个spark作业,每次运行时在s3上处理几个文件夹,并将其状态存储在dynamodb上。换句话说,我们每天运行一次作业,它查找由另一个作业添加的新文件夹,逐个转换它们,并将状态写入dynamodb。下面是粗略的伪代码:

  1. object App {
  2. val allFolders = S3Folders.list()
  3. val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
  4. Transformer.run(foldersToProcess)
  5. }
  6. object Transformer {
  7. def run(folders: List[String]): Unit = {
  8. val sc = new SparkContext()
  9. folders.foreach(process(sc, _))
  10. }
  11. def process(sc: SparkContext, folder: String): Unit = ??? // transform and write to S3
  12. }

如果 S3Folders.list() 返回相对较少的文件夹(最多几千个),如果返回更多(4-8k),我们经常会看到以下错误(乍一看与spark无关):

  1. 17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
  2. SaxParser$ListObjectsV2Handler
  3. shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
  4. at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
  5. at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
  6. at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
  7. at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
  8. at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
  9. at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
  10. at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
  11. at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
  12. at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
  13. at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
  14. at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  15. at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  16. at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  17. at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  18. at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  19. at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  20. at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
  21. at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
  22. at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
  23. at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
  24. at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
  25. at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
  26. at scala.collection.immutable.List.foreach(List.scala:381)
  27. at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
  28. at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
  29. at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
  30. at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
  31. at me.chuwy.transform.Main$.main(Main.scala:22)
  32. at me.chuwy.transform.Main.main(Main.scala)
  33. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  34. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  35. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  36. at java.lang.reflect.Method.invoke(Method.java:498)
  37. at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
  38. Caused by: shadeaws.AbortedException:
  39. at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
  40. at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
  41. at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
  42. at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
  43. at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
  44. at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
  45. at java.io.InputStreamReader.read(InputStreamReader.java:184)
  46. at java.io.BufferedReader.read1(BufferedReader.java:210)
  47. at java.io.BufferedReader.read(BufferedReader.java:286)
  48. at java.io.Reader.read(Reader.java:140)
  49. at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
  50. ... 36 more

对于大量文件夹(~20k),这种情况经常发生,作业无法启动。
以前,我们有非常相似的错误,但更频繁的时候 getFoldersToProcessGetItem 对于中的每个文件夹 allFolders 因此花费了更长的时间:

  1. 17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
  2. shadeaws.AbortedException:
  3. at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
  4. at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
  5. at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
  6. at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
  7. at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
  8. at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
  9. at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
  10. at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
  11. at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
  12. at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
  13. at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
  14. at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
  15. at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
  16. at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
  17. at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
  18. at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
  19. at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  20. at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  21. at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  22. at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  23. at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
  24. at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
  25. at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
  26. at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
  27. at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
  28. at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed$1.apply(ProcessManifest.scala:44)
  29. at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
  30. at scala.collection.immutable.List.foreach(List.scala:381)
  31. at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
  32. at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
  33. at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
  34. at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
  35. at me.chuwy.transform.Main$.main(Main.scala:19)
  36. at me.chuwy.transform.Main.main(Main.scala)
  37. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  38. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  39. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  40. at java.lang.reflect.Method.invoke(Method.java:498)
  41. at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

我认为当前的错误与xml解析或无效响应无关,而是源于spark内部的某些竞争条件,因为:
“状态获取”所花费的时间和失败的几率之间有着明显的联系
回溯有潜在的 AbortedException 是由吞咽引起的 InterruptedException ,这可能意味着jvm(spark submit甚至yarn)调用中的某些内容 Thread.sleep 用于主螺纹。
现在我正在使用emr ami 5.5.0、spark 2.1.0和shaded aws sdk 1.11.208,但是在aws sdk 1.10.75中有类似的错误。
我正在emr上通过 command-runner.jar spark-submit --deploy-mode cluster --class ... .
有没有人知道这个异常是从哪里来的以及如何修复它?

gmol1639

gmol16391#

foreach不保证有序的计算,它将操作应用于rdd的每个元素,这意味着它将为每个元素示例化,而每个元素又可能使执行器不知所措。

mmvthczy

mmvthczy2#

问题是 getFoldersToProcess 是一个阻塞(并且非常长)操作,它阻止sparkcontext被示例化。spackcontext本身应该向yarn发出自己示例化的信号,如果在一定时间内没有帮助,则yarn会假设驱动程序节点已经脱落并杀死整个集群。

相关问题