kafka流关闭钩子和意外异常处理在同一个流应用程序中

y3bcpkx1  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(376)

我的任务是拆除一个dev环境,并从废弃的环境中重新设置它,以验证我们的cicd进程;唯一的问题是,我创建一个主题时出错,因此kafka streams应用程序退出时出错。
我深究了一下,发现了问题并改正了,但当我深究的时候,我又遇到了一个奇怪的小问题。
我实现了一个意外的异常处理程序,如下所示:

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    streams.close();

    // Maybe do some more exception handling.
    // Open a lock that is waiting after streams.start() call 
    // to let application exit normally
    shutdownLatch.countDown();
});

问题是,如果应用程序在调用kafkastreams::close时由于主题错误而引发异常,则在尝试调用kafkastreams::waitonstate后,应用程序似乎在windowsselectorimpl::poll中死锁。
我认为在异常处理程序中调用kafkastreams::close可能有问题,但我发现是这样的,并且matthias j。sax说应该可以在异常处理程序中调用kafkastreams::close,但要注意不要从多个线程调用kafkastreams::close。
问题是,我想实现一个shutdown钩子,以便在请求时优雅地终止steams应用程序,并实现unexpectedexception处理程序,以便在发生异常时进行清理和优雅地终止。
我提出了下面的解决方案,在调用close之前检查kafkastreams的状态,它确实可以工作,但是它似乎有点不确定,因为除了运行(可能是挂起)之外,我还可以看到其他情况,我们希望确保kafkastreams::close it调用。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.fatal("Caught Shutdown request");
    // Do some shutdown cleanup.
    if (streams.state().isRunning())
    {
        If this hook is called due to the Main exiting after handling 
        an exception we don't want to call close again. It doesn't 
        cause any errors but logs that the application was closed 
        a second time.
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do a little bit more clean up before system exits.
    System.exit(0);

}));

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    if (streams.state().isRunning())
    {
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do some more exception handling.

    // Open the Gate to let application exit normally
    shutdownLatch.countDown();
    // Or Optionally call halt to immediately terminate and prevent call to Shutdown hook.
    Runtime.getRuntime().halt(0);
});

有什么建议为什么打电话给ka吗fkasteams:close in 异常处理程序会引起这样的麻烦,或者如果有更好的方法同时实现shutdownhook和异常处理程序,我们会非常感激吗?

ev7lccsx

ev7lccsx1#

打电话 close() 从一个异常处理程序和从关闭钩子是略有不同。 close() 如果从shutdown hook调用,可能会死锁(cf。https://issues.apache.org/jira/browse/kafka-4366)因此,您应该使用超时来调用它。
另外,这个问题与打电话有关 System.exit() 在jira中描述的未捕获异常处理程序中。一般来说,打电话 System.exit() 是相当严厉的,应该避免。
您的解决方案似乎也不是100%健壮的,因为 streams.state().isRunning() 可能导致比赛状态。
使用超时的另一种方法可能是只设置 AtomicBoolean 在shutdown钩子和异常处理程序中,如果布尔标志设置为true,则使用“main()”线程调用close:

private final static AtomicBoolean stopStreams = new AtomicBoolean(false);

public static void main(String[] args) {
  // do stuff

  KafkaStreams streams = ...
  stream.setUncaughtExceptionHandler((t, e) -> {
    stopStreams.set(true);
  });

  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    stopStreams.set(true);
  });

  while (!stopStreams.get()) {
    Thread.sleep(1000);
  }
  streams.close();
}

相关问题