由于kafka-streams2.8.0,您可以使用KafkaStreams方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);将失败的流线程(由未捕获的异常引起)自动替换为StreamThreadExceptionResponse.REPLACE_THREAD。有关详细信息,请查看Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
4条答案
按热度按时间x7yiwoj41#
这取决于你想对生产者端的异常做什么。如果生产者抛出异常(例如,由于网络故障或Kafka broker已死亡),默认情况下流将死亡。并且使用kafka-streams版本1.1.0,你可以通过实现
ProductionExceptionHandler
覆盖默认行为,如下所示:从handle方法中,如果不希望流在异常时死亡,则可以返回
CONTINUE
,如果希望流停止,则可以返回FAIL
(默认为FAIL)。并且您需要在流配置中指定此类:还要注意的是
ProductionExceptionHandler
只处理生产者的异常,并且它不会处理使用流方法mapValues(..)
,filter(..)
,branch(..)
等处理消息期间的异常,您需要使用try / catch块 Package 这些方法逻辑(将所有方法逻辑放入try块中,以确保您将处理所有异常情况):据我所知,我们不需要显式地处理消费者端的异常,因为Kafka流将在稍后重试自动消费(因为偏移量将不会改变,直到消息被消费和处理);例如,如果Kafka broker在一段时间内无法到达,您将从kafka流中获得异常,并且当breaked将被打开时,kafka流将消耗所有消息。因此,在这种情况下,我们将仅具有延迟,并且没有损坏/丢失。
使用
setUncaughtExceptionHandler
,您将无法像使用ProductionExceptionHandler
那样更改默认行为,使用它,您只能记录错误或将消息发送到失败主题。自
kafka-streams
2.8.0
以来的更新由于
kafka-streams
2.8.0
,您可以使用KafkaStreams
方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
将失败的流线程(由未捕获的异常引起)自动替换为StreamThreadExceptionResponse.REPLACE_THREAD
。有关详细信息,请查看Kafka Streams Specific Uncaught Exception Handler6jygbczu2#
对于处理消费者端的异常,
1)可以使用以下属性在producer中添加默认异常处理程序。
apache基本上提供了三个异常处理程序类
1)LogAndContiuneExceptionHandler,您可以将其作为
2)日志和失败异常处理程序
3)无效时间戳时的日志和跳过
对于自定义异常处理,
1)您可以实现DeserializationExceptionHandler接口并重写handle()方法。
2)或者您可以扩展上述类。
xzlaal3s3#
setUncaughtExceptionHandler无助于处理异常,它在流由于未捕获的某些异常而终止后工作。
Kafka提供了几种处理异常的方法。简单的 try-catch{} 将有助于捕获处理器代码中的异常,但Kafka反序列化异常(可能由于数据问题)和生产异常(在与代理通信期间发生)分别需要DeserializationExceptionHandler和ProductionExceptionHandler。默认情况下,如果遇到其中任何一种异常,Kafka应用程序将失败。
您可以在此post上找到
rpppsulh4#
在Spring Cloud Stream中,您可以使用以下命令配置自定义反序列化处理程序:
*spring.cloud.stream.Kafka.streams.binder.配置.默认.反序列化.异常.处理程序=您的包名称.CustomLogAndContinueExceptionHandler