java 处理Kafka流中的异常

xggvc2p6  于 2023-01-24  发布在  Java
关注(0)|答案(4)|浏览(281)

已经经历了多个职位,但他们中的大多数是相关的处理坏消息,而不是关于异常处理,而处理他们。
我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因,如网络故障、运行时异常等,

  • 有人能建议什么是正确的方法吗?我应该使用setUncaughtExceptionHandler吗?或者有更好的方法吗?
  • 如何处理重试?
x7yiwoj4

x7yiwoj41#

这取决于你想对生产者端的异常做什么。如果生产者抛出异常(例如,由于网络故障或Kafka broker已死亡),默认情况下流将死亡。并且使用kafka-streams版本1.1.0,你可以通过实现ProductionExceptionHandler覆盖默认行为,如下所示:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}

从handle方法中,如果不希望流在异常时死亡,则可以返回CONTINUE,如果希望流停止,则可以返回FAIL(默认为FAIL)。并且您需要在流配置中指定此类:

default.production.exception.handler=com.example.CustomProductionExceptionHandler

还要注意的是ProductionExceptionHandler只处理生产者的异常,并且它不会处理使用流方法mapValues(..)filter(..)branch(..)等处理消息期间的异常,您需要使用try / catch块 Package 这些方法逻辑(将所有方法逻辑放入try块中,以确保您将处理所有异常情况):

.filter((key, value) -> { try {..} catch (Exception e) {..} })

据我所知,我们不需要显式地处理消费者端的异常,因为Kafka流将在稍后重试自动消费(因为偏移量将不会改变,直到消息被消费和处理);例如,如果Kafka broker在一段时间内无法到达,您将从kafka流中获得异常,并且当breaked将被打开时,kafka流将消耗所有消息。因此,在这种情况下,我们将仅具有延迟,并且没有损坏/丢失。
使用setUncaughtExceptionHandler,您将无法像使用ProductionExceptionHandler那样更改默认行为,使用它,您只能记录错误或将消息发送到失败主题。

kafka-streams2.8.0以来的更新

由于kafka-streams2.8.0,您可以使用KafkaStreams方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);将失败的流线程(由未捕获的异常引起)自动替换为StreamThreadExceptionResponse.REPLACE_THREAD。有关详细信息,请查看Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
6jygbczu

6jygbczu2#

对于处理消费者端的异常,
1)可以使用以下属性在producer中添加默认异常处理程序。

"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";

apache基本上提供了三个异常处理程序类
1)LogAndContiuneExceptionHandler,您可以将其作为

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndContinueExceptionHandler.class);

2)日志和失败异常处理程序

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndFailExceptionHandler.class);

3)无效时间戳时的日志和跳过

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndSkipOnInvalidTimestamp.class);

对于自定义异常处理,
1)您可以实现DeserializationExceptionHandler接口并重写handle()方法。
2)或者您可以扩展上述类。

xzlaal3s

xzlaal3s3#

setUncaughtExceptionHandler无助于处理异常,它在流由于未捕获的某些异常而终止后工作。

Kafka提供了几种处理异常的方法。简单的 try-catch{} 将有助于捕获处理器代码中的异常,但Kafka反序列化异常(可能由于数据问题)和生产异常(在与代理通信期间发生)分别需要DeserializationExceptionHandlerProductionExceptionHandler。默认情况下,如果遇到其中任何一种异常,Kafka应用程序将失败。
您可以在此post上找到

rpppsulh

rpppsulh4#

在Spring Cloud Stream中,您可以使用以下命令配置自定义反序列化处理程序:

*spring.cloud.stream.Kafka.streams.binder.配置.默认.反序列化.异常.处理程序=您的包名称.CustomLogAndContinueExceptionHandler

  • 自定义日志和连续异常处理程序扩展日志和连续异常处理程序或实现反序列化异常处理程序
  • 自定义日志和继续异常处理程序反序列化处理程序响应。继续或失败取决于您的使用情况
@Slf4j
public class CustomLogAndContinueExceptionHandler extends LogAndContinueExceptionHandler {

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
            Exception exception) {
.... some business logic here ....
        log.error("Message failed: taskId: {}, topic: {}, partition: {}, offset: {}, , detailerror : {}",
                context.taskId(), record.topic(), record.partition(), record.offset(), exception.getMessage());
        return DeserializationHandlerResponse.CONTINUE;
    }
}

相关问题