Flink -公开新的自定义REST API

vc9ivgsu  于 2023-11-15  发布在  Apache
关注(0)|答案(1)|浏览(168)

在Apache Flink REST API文档中,在开发部分中写道:
要添加新请求,需要

  • 添加一个新的MessageHeaders类作为新请求的接口,
  • 添加新的AbstractRestHandler类,其根据所添加的MessageHeaders类来处理请求,
  • 将处理程序添加到org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()

我能完成前两点,但我不知道如何处理后一点。
这就是我的主类的样子:

public static void main(String[] args) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.
        getExecutionEnvironment();
    Source<String> source = ...;
    Sink<String> sink = ...;

    MyJob job = new MyJob(env, source, sink);
    env.execute();
}

字符串
我看到WebMonitorEndpoint的构造函数有很多我不知道如何获取的参数。
无论如何,我不认为创建一个WebMonitorEndpoint的新示例(或它的子类)是正确的方法,因为当应用程序启动时,Flink会创建一个新示例。
我希望你能帮助我。

mzsu5hc0

mzsu5hc01#

从官方文件来看,
一个很好的例子是org.apache.flink.runtime.rest.handler.job. JobExceptionsHeaders,它使用org.apache.flink.runtime.rest.messages.JobExceptionsHeaders。
关于第3点,您应该将CustomHandler添加到this initializeError()函数中。

JobExceptionsHandler jobExceptionsHandler =
    new JobExceptionsHandler(
            leaderRetriever,
            timeout,
            responseHeaders,
            JobExceptionsHeaders.getInstance(),
            executionGraphCache,
            executor);

字符串
在类初始化之后,add()到处理程序,如代码中所述,

handlers.add(Tuple2.of(jobExceptionsHandler.getMessageHeaders(), jobExceptionsHandler));

相关问题