scala未来-java.util.concurrent.rejectedexecutionexception

vsmadaxz  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(355)

我有一个spark应用程序,它使用scala futures同时处理许多spark作业,我以前有一个包含5个线程的池,但必须将该池增加到15个线程,然后我开始出现错误:java.util.concurrent.rejectedexecutionexception
我的代码:

object TableProcessorWrapper extends SparkSessionWrapper {

  def main(args: Array[String]): List[Unit] = {

    implicit val ec = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(15))

    val dynamodb = DynamodbOperations()

    val tables = dynamodb.getTablesToProcess(args(0), "table")

    val processors = for {
      table <- tables
    } yield Future {
      TableProcessor(table).start(spark)
    }

    for {
      processor <- processors
    } {
      processor.onComplete {
        case Success(value) => println("Finished Processing")
        case Failure(e) => {
          System.err.println(e.getMessage)
          e.printStackTrace()
        }
      }
    }

    Await.result(Future.sequence(processors), Duration.Inf)
  }
}

你能帮我解决这个问题吗?我不知道怎么开始。
太谢谢你了!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题