我已经使用spark3运行了一些测试,但是我遇到了一个奇怪的问题。阶段将随机卡住,然后孔应用程序将不会生成新任务,并且驱动程序将记录npe。
spark版本:3.0.1
问题代码,例如。
val df = spark.sql("select *,b.name from a join (select * from b) on a.id=b.id")
df.createOrReplaceTempView("c")
val df1 = spark.sql("select * from c union (select * from d) union (select * from e)")
df1.createOrReplaceTempView("f")
spark.sql("insert overwrite table test select * from f")
驱动程序日志:
20/11/20 19:24:46 ERROR netty.Inbox: Ignoring error
java.lang.NullPointerException
at java.lang.String.length(String.java:623)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:420)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$5(TaskSetManager.scala:459)
at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
at org.apache.spark.scheduler.TaskSetManager.logInfo(TaskSetManager.scala:54)
at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:459)
at scala.Option.map(Option.scala:230)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:417)
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:351)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:345)
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:477)
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:472)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:472)
at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:448)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:448)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$5(CoarseGrainedSchedulerBackend.scala:328)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:817)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:320)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:161)
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
这个问题是随机出现的,大约五分之一。所以我检查了Spark芯,发现里面放错了地方 org.apache.spark.scheduler.TaskSetManager
459排,这个 $host
有问题,我试试 println(host)
,然后我得到了 Some(null)
在日志中。
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
暂无答案!
目前还没有任何答案,快来回答吧!