我有一个spark应用程序,它使用scala futures同时处理许多spark作业,我以前有一个包含5个线程的池,但必须将该池增加到15个线程,然后我开始出现错误:java.util.concurrent.rejectedexecutionexception
我的代码:
object TableProcessorWrapper extends SparkSessionWrapper {
def main(args: Array[String]): List[Unit] = {
implicit val ec = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(15))
val dynamodb = DynamodbOperations()
val tables = dynamodb.getTablesToProcess(args(0), "table")
val processors = for {
table <- tables
} yield Future {
TableProcessor(table).start(spark)
}
for {
processor <- processors
} {
processor.onComplete {
case Success(value) => println("Finished Processing")
case Failure(e) => {
System.err.println(e.getMessage)
e.printStackTrace()
}
}
}
Await.result(Future.sequence(processors), Duration.Inf)
}
}
你能帮我解决这个问题吗?我不知道怎么开始。
太谢谢你了!
暂无答案!
目前还没有任何答案,快来回答吧!