如何从程序中获取spark作业状态?

wixjitnu  于 2021-05-30  发布在  Hadoop
关注(0)|答案(5)|浏览(597)

我知道hadooprestapi通过程序提供了对作业状态的访问。
类似地,有没有办法在程序中获得spark作业状态?

velaa5lx

velaa5lx1#

spark ui上有一个(n)(几乎)未记录的restapi特性,它提供了有关作业和性能的度量。
您可以通过以下方式访问:

http://<driverHost>:<uiPort>/metrics/json/

(uiport默认为4040)

gojuced7

gojuced72#

为java提供答案。在scala中,仅仅使用sparkcontext而不是javasparkcontext几乎是相似的。
假设您有一个javasparkcontext:

private final JavaSparkContext sc;

以下代码允许从“作业”和“阶段”选项卡获取所有可用信息:

JavaSparkStatusTracker statusTracker = sc.statusTracker();
for(int jobId: statusTracker.getActiveJobIds()) {
     SparkJobInfo jobInfo = statusTracker.getJobInfo(jobId);
     log.info("Job " + jobId + " status is " + jobInfo.status().name());
     log.info("Stages status:");

     for(int stageId: jobInfo.stageIds()) {
         SparkStageInfo stageInfo = statusTracker.getStageInfo(stageId);

         log.info("Stage id=" + stageId + "; name = " + stageInfo.name()
                    + "; completed tasks:" + stageInfo.numCompletedTasks()
                    + "; active tasks: " + stageInfo.numActiveTasks()
                    + "; all tasks: " + stageInfo.numTasks()
                    + "; submission time: " + stageInfo.submissionTime());
    }
}

不幸的是,其他所有内容都只能从scalaspark上下文访问,因此使用java提供的结构可能会有一些困难。
池列表:sc.sc().getallpools()
执行器内存状态:sc.sc().getexecutormemorystatus()
执行器ID:sc.sc().getexecutorids()
存储信息:sc.sc().getrddstorageinfo()
... 你可以试着找到更多有用的信息。

yc0p9oo0

yc0p9oo03#

您也可以不使用spark job history server获取spark job状态。您可以使用sparklauncher 2.0.1(甚至spark 1.6版本也可以)从java程序启动spark作业:

SparkAppHandle appHandle = sparkLauncher.startApplication();

也可以将侦听器添加到startapplication()方法:

SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);

其中listener有两个方法可以通知您作业状态的变化和信息的变化。
我使用countdownlatch实现了,它按预期工作。这适用于sparklauncher版本2.0.1,也适用于Yarn簇模式。

...
final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();
long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);    
    ...

private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
    private static final Log log = LogFactory.getLog(SparkAppListener.class);
    private final CountDownLatch countDownLatch;
    public SparkAppListener(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void stateChanged(SparkAppHandle handle) {
        String sparkAppId = handle.getAppId();
        State appState = handle.getState();
        if (sparkAppId != null) {
            log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
                    + SPARK_STATE_MSG.get(appState));
        } else {
            log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
        }
        if (appState != null && appState.isFinal()) {
            countDownLatch.countDown();
        }
    }
    @Override
    public void infoChanged(SparkAppHandle handle) {}
    @Override
    public void run() {}
}
hwamh0ep

hwamh0ep4#

它与restapi不同,但是您可以通过注册 SparkListenerSparkContext.addSparkListener . 它是这样的:

sc.addSparkListener(new SparkListener {
  override def onStageCompleted(event: SparkListenerStageCompleted) = {
    if (event.stageInfo.stageId == myStage) {
      println(s"Stage $myStage is done.")
    }
  }
})
pb3skfrl

pb3skfrl5#

有一个(n)(几乎)未记录的RESTAPI特性,它提供了您在spark ui上看到的几乎所有内容:

http://<sparkMasterHost>:<uiPort>/api/v1/...

对于本地安装,您可以从这里开始:

http://localhost:8080/api/v1/applications

您可以在这里找到可能的终点:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/apirootresource.scala

相关问题