我正在开发一个共享的ApacheZeppelin服务器。几乎每天,我都会尝试运行一个命令,并出现以下错误: Job 65 cancelled because SparkContext was shut down
我很想了解更多导致sparkcontext关闭的原因。我的理解是齐柏林飞艇是一个kube应用程序,它向机器发送命令进行处理。
当sparkcontext关闭时,这是否意味着我与spark cluster之间的桥梁关闭了?如果是这样的话,我怎样才能让通往星火团的桥塌下来呢?
在这个例子中,它发生在我试图上传数据到s3的时候。
这是密码
val myfiles = readParquet(
startDate=ew LocalDate(2020, 4, 1),
endDate=ew LocalDate(2020, 4, 7)
)
log_events.createOrReplaceTempView("log_events")
val mySQLDF = spark.sql(s"""
select [6 columns]
from myfiles
join [other table]
on [join_condition]
"""
)
mySQLDF.write.option("maxRecordsPerFile", 1000000).parquet(path)
// mySQLDF has 3M rows and they're all strings or dates
这是stacktrace错误
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
... 48 elided
Caused by: org.apache.spark.SparkException: Job 44 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:972)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:970)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:970)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2286)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2193)
at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 70 more
1条答案
按热度按时间0vvn1miw1#
您的作业在写入步骤被中止。
Job aborted.
是导致spark上下文关闭的异常消息。研究优化写入步骤,
maxRecordsPerFile
可能是罪魁祸首;也许试一个低一点的号码。。您当前在一个文件中有1百万条记录!一般来说,
Job ${job.jobId} cancelled because SparkContext was shut down
只是意味着这是一个异常,因为dag无法继续,需要出错。它是spark调度器在遇到异常时抛出此错误的原因,它可能是代码中未处理的异常,也可能是由于任何其他原因导致的作业失败。当dag调度程序停止时,整个应用程序将停止(此消息是清理的一部分)。回答你的问题-
当sparkcontext关闭时,这是否意味着我与spark cluster之间的桥梁关闭了?
sparkcontext表示到spark集群的连接,因此,如果它死了,则意味着您无法对它运行run job,因为您丢失了链接!在zepplin上,您只需重新启动sparkcontext(菜单->解释器->spark解释器->重新启动)
如果是这样的话,我怎样才能让通往星火团的桥塌下来呢?
作业中出现sparkexception/error或手动使用
sc.stop()