我使用了多个KafkaScript(类注解为@KafkaListener
),方法注解为@KafkaHandler
和@RetryableTopic
。但是方法@DltHandler
在这种情况下不起作用。我该怎么弥补?如果我是正确的,应用程序使用默认的DefaultErrorHandler
。我怎么能切换到@RetryableTopic
方法?
在RetryTopicConfiguration bean下面
@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, byte[]> template) {
List<String> topics = new ArrayList<>();
topics.add("UserMicroServiceEvents");
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(2)
.includeTopics(topics)
.concurrency(1)
.autoCreateTopics(true, 1, (short) 1)
.notRetryOn(NullPointerException.class)
.create(template);
}
Kafka听众是
@Service
@Log4j2
@KafkaListener(groupId = "multiple", topics = "UserMicroServiceEvents")
public class KafkaMainListenerService {
private final IKafkaProcessorEvent kafkaProcessorEvent;
private final MeterRegistry meterRegistry;
private final FailedMessageRepository repository;
private final ObjectMapper mapper;
private final Counter successfullyProcessedEventCounter;
public KafkaMainListenerService(IKafkaProcessorEvent kafkaProcessorEvent,
MeterRegistry meterRegistry,
FailedMessageRepository repository,
ObjectMapper mapper) {
this.kafkaProcessorEvent = kafkaProcessorEvent;
this.meterRegistry = meterRegistry;
this.repository = repository;
this.mapper = mapper;
successfullyProcessedEventCounter = Counter.builder("SPUMSE")
.description("amount successfully processed events from microservice")
.register(meterRegistry);
}
@RetryableTopic
@KafkaHandler(isDefault = false)
public void listenForEvent(Cat record) {
//kafkaProcessorEvent.process(record);
log.info("event was received");
log.info("Event type:" + record.getClass());
log.info(record);
log.info(record.getName());
log.info(record.getAge());
successfullyProcessedEventCounter.increment();
}
@RetryableTopic
@KafkaHandler(isDefault = false)
public void listenForEvent2(UserServiceEventDto record) {
kafkaProcessorEvent.process(record);
successfullyProcessedEventCounter.increment();
}
@DltHandler
public void dltProcessor(ConsumerRecord<String, ?> record,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
{
try {
FailedMessageEntity failedMessage = new FailedMessageEntity();
failedMessage.setUuid(UUID.randomUUID());
failedMessage.setMessage(mapper.writeValueAsString(record.value()));
failedMessage.setOffset(record.offset());
failedMessage.setTopic(record.topic());
failedMessage.setException(errorMessage);
repository.save(failedMessage);
log.warn("Information about fail was added into database!");
} catch (Exception e) {
log.error(
"Failed to process DLT message. Error message is " + e.getMessage()
+ "; Error is " + e.getMessage() + ";\n "
+ " Cause is: " + e.getCause().getMessage() + "\n"
);
}
}
}
}
例外是
2023-09-03T01:18:50.197+03:00 ERROR 15392 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:174) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2959) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.0.9.jar:3.0.9]
at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.2.jar:1.11.2]
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.2.jar:1.11.2]
at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.2.jar:1.11.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2820) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2672) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2558) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2200) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1555) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1394) ~[spring-kafka-3.0.9.jar:3.0.9]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void by.ITAcademy.AuditMicroService.service.kafkaListeners.KafkaMainListenerService.listenForEvent2(by.ITAcademy.AuditMicroService.models.dto.UserServiceEventDto)' threw exception
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2992) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2937) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2904) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2822) ~[spring-kafka-3.0.9.jar:3.0.9]
... 12 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:391) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2923) ~[spring-kafka-3.0.9.jar:3.0.9]
Caused by: java.lang.NullPointerException: Cannot invoke "by.ITAcademy.AuditMicroService.models.dto.UserAudit.getUuid()" because the return value of "by.ITAcademy.AuditMicroService.models.dto.UserServiceEventDto.getUserAudit()" is null
at by.ITAcademy.AuditMicroService.service.impl.KafkaUserMicroServiceEventProcessor.process(KafkaUserMicroServiceEventProcessor.java:30) ~[classes/:na]
at by.ITAcademy.AuditMicroService.service.impl.KafkaUserMicroServiceEventProcessor.process(KafkaUserMicroServiceEventProcessor.java:14) ~[classes/:na]
at by.ITAcademy.AuditMicroService.service.kafkaListeners.KafkaMainListenerService.listenForEvent2(KafkaMainListenerService.java:65) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.11.jar:6.0.11]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.11.jar:6.0.11]
at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:167) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:66) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2923) ~[spring-kafka-3.0.9.jar:3.0.9]
... 14 common frames omitted
1条答案
按热度按时间ldioqlga1#
这种逻辑是错误的。
@KafkaHandler
与主题无关。当我们已经通过@KafkaListener
使用了主题中的数据时,这确实是一种简单的路由机制。因此,@RetryableTopic
与@KafkaListener
完全绑定,但不是其内部逻辑。因此,您需要修改配置,将@RetryableTopic
从@KafkaHandler
中移除。