apache flink joblistener无法正常工作

wswtfjt7  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(850)

我在flink1.11.1中编写了一个flink批处理作业。作业成功完成后,我想做一些事情,比如调用http服务。
我添加了一个简单的作业侦听器来钩住作业状态。问题是当kafka sink操作符抛出错误时,不会触发job listener。我希望当我的作业失败时,它会触发我的作业侦听器并打印失败日志。
我怎样才能确定这项工作成功与否?
任何帮助都将不胜感激。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }

      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })

    env.createInput(input)
      .filter(r => Option(r.token).getOrElse("").nonEmpty)
      .addSink(kafkaProducer)
b1uwtaje

b1uwtaje1#

如果您尝试在群集上运行作业,可以在控制台中使用作业id查看日志记录消息和标准输出。请参阅所附的屏幕截图,
默认url可以是http://localhost:8081如果您在本地群集上运行。
同样,下面的方法是不正确的检查你的工作是否成功。

if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }

相关问题