spark驱动程序中可能的竞争条件

icnyk63a  于 2021-07-09  发布在  Java
关注(0)|答案(2)|浏览(447)

我有一个spark作业,每次运行时在s3上处理几个文件夹,并将其状态存储在dynamodb上。换句话说,我们每天运行一次作业,它查找由另一个作业添加的新文件夹,逐个转换它们,并将状态写入dynamodb。下面是粗略的伪代码:

object App {
  val allFolders = S3Folders.list()
  val foldersToProcess = DynamoDBState.getFoldersToProcess(allFolders)
  Transformer.run(foldersToProcess)
}

object Transformer {
  def run(folders: List[String]): Unit = {
    val sc = new SparkContext()
    folders.foreach(process(sc, _))
  }

  def process(sc: SparkContext, folder: String): Unit = ???  // transform and write to S3
}

如果 S3Folders.list() 返回相对较少的文件夹(最多几千个),如果返回更多(4-8k),我们经常会看到以下错误(乍一看与spark无关):

17/10/31 08:38:20 ERROR ApplicationMaster: User class threw exception: shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponses
SaxParser$ListObjectsV2Handler
shadeaws.SdkClientException: Failed to sanitize XML document destined for handler class shadeaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88)
        at shadeaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
        at shadeaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4247)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4194)
        at shadeaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4188)
        at shadeaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865)
        at me.chuwy.transform.S3Folders$.com$chuwy$transform$S3Folders$$isGlacierified(S3Folders.scala:136)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.S3Folders$.list(S3Folders.scala:112)
        at me.chuwy.transform.Main$.main(Main.scala:22)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
Caused by: shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:53)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:81)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.read1(BufferedReader.java:210)
        at java.io.BufferedReader.read(BufferedReader.java:286)
        at java.io.Reader.read(Reader.java:140)
        at shadeaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:186)
        ... 36 more

对于大量文件夹(~20k),这种情况经常发生,作业无法启动。
以前,我们有非常相似的错误,但更频繁的时候 getFoldersToProcessGetItem 对于中的每个文件夹 allFolders 因此花费了更长的时间:

17/09/30 14:46:07 ERROR ApplicationMaster: User class threw exception: shadeaws.AbortedException:
shadeaws.AbortedException:
        at shadeaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)
        at shadeaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)
        at shadeaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:489)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:126)
        at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:215)
        at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1240)
        at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:802)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:109)
        at shadeaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
        at shadeaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1503)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1226)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at shadeaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at shadeaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at shadeaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2089)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2065)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1173)
        at shadeaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1149)
        at me.chuwy.tranform.sdk.Manifest$.contains(Manifest.scala:179)
        at me.chuwy.tranform.DynamoDBState$$anonfun$getUnprocessed$1.apply(ProcessManifest.scala:44)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filterNot(TraversableLike.scala:267)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:104)
        at me.chuwy.transform.DynamoDBState$.getFoldersToProcess(DynamoDBState.scala:44)
        at me.chuwy.transform.Main$.main(Main.scala:19)
        at me.chuwy.transform.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

我认为当前的错误与xml解析或无效响应无关,而是源于spark内部的某些竞争条件,因为:
“状态获取”所花费的时间和失败的几率之间有着明显的联系
回溯有潜在的 AbortedException 是由吞咽引起的 InterruptedException ,这可能意味着jvm(spark submit甚至yarn)调用中的某些内容 Thread.sleep 用于主螺纹。
现在我正在使用emr ami 5.5.0、spark 2.1.0和shaded aws sdk 1.11.208,但是在aws sdk 1.10.75中有类似的错误。
我正在emr上通过 command-runner.jar spark-submit --deploy-mode cluster --class ... .
有没有人知道这个异常是从哪里来的以及如何修复它?

gmol1639

gmol16391#

foreach不保证有序的计算,它将操作应用于rdd的每个元素,这意味着它将为每个元素示例化,而每个元素又可能使执行器不知所措。

mmvthczy

mmvthczy2#

问题是 getFoldersToProcess 是一个阻塞(并且非常长)操作,它阻止sparkcontext被示例化。spackcontext本身应该向yarn发出自己示例化的信号,如果在一定时间内没有帮助,则yarn会假设驱动程序节点已经脱落并杀死整个集群。

相关问题