@RetryableTopic与@ Kafkaiti

huwehgph  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(125)

我使用了多个KafkaScript(类注解为@KafkaListener),方法注解为@KafkaHandler@RetryableTopic。但是方法@DltHandler在这种情况下不起作用。我该怎么弥补?如果我是正确的,应用程序使用默认的DefaultErrorHandler。我怎么能切换到@RetryableTopic方法?
在RetryTopicConfiguration bean下面

@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, byte[]> template) {
    List<String> topics = new ArrayList<>();
    topics.add("UserMicroServiceEvents");

    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(2)
            .includeTopics(topics)
            .concurrency(1)
            .autoCreateTopics(true, 1, (short) 1)
            .notRetryOn(NullPointerException.class)
            .create(template);
}

Kafka听众是

@Service
@Log4j2
@KafkaListener(groupId = "multiple", topics = "UserMicroServiceEvents")
public class KafkaMainListenerService {
    private final IKafkaProcessorEvent kafkaProcessorEvent;
    private final MeterRegistry meterRegistry;
    private final FailedMessageRepository repository;
    private final ObjectMapper mapper;
    private final Counter successfullyProcessedEventCounter;

    public KafkaMainListenerService(IKafkaProcessorEvent kafkaProcessorEvent,
                                    MeterRegistry meterRegistry,
                                    FailedMessageRepository repository,
                                    ObjectMapper mapper) {
        this.kafkaProcessorEvent = kafkaProcessorEvent;
        this.meterRegistry = meterRegistry;
        this.repository = repository;
        this.mapper = mapper;

        successfullyProcessedEventCounter = Counter.builder("SPUMSE")
                .description("amount successfully processed events from microservice")
                .register(meterRegistry);
    }

    @RetryableTopic
    @KafkaHandler(isDefault = false)
    public void listenForEvent(Cat record) {
        //kafkaProcessorEvent.process(record);
        log.info("event was received");
        log.info("Event type:" + record.getClass());
        log.info(record);
        log.info(record.getName());
        log.info(record.getAge());
        successfullyProcessedEventCounter.increment();

    }

    @RetryableTopic
    @KafkaHandler(isDefault = false)
    public void listenForEvent2(UserServiceEventDto record) {
        kafkaProcessorEvent.process(record);
        successfullyProcessedEventCounter.increment();
    }

    @DltHandler
    public void dltProcessor(ConsumerRecord<String, ?> record,
                             @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
        {
            try {
                FailedMessageEntity failedMessage = new FailedMessageEntity();
                failedMessage.setUuid(UUID.randomUUID());
                failedMessage.setMessage(mapper.writeValueAsString(record.value()));
                failedMessage.setOffset(record.offset());
                failedMessage.setTopic(record.topic());
                failedMessage.setException(errorMessage);
                repository.save(failedMessage);
                log.warn("Information about fail was added into database!");

            } catch (Exception e) {
                log.error(
                        "Failed to process DLT message. Error message is " + e.getMessage()
                                + "; Error is " + e.getMessage() + ";\n "
                                + " Cause is: " + e.getCause().getMessage() + "\n"
                );
            }
        }
    }
}

例外是

2023-09-03T01:18:50.197+03:00 ERROR 15392 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:174) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2959) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2833) ~[spring-kafka-3.0.9.jar:3.0.9]
    at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.2.jar:1.11.2]
    at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.2.jar:1.11.2]
    at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.2.jar:1.11.2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2820) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2672) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2558) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2200) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1555) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1394) ~[spring-kafka-3.0.9.jar:3.0.9]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void by.ITAcademy.AuditMicroService.service.kafkaListeners.KafkaMainListenerService.listenForEvent2(by.ITAcademy.AuditMicroService.models.dto.UserServiceEventDto)' threw exception
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2992) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2937) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2904) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2822) ~[spring-kafka-3.0.9.jar:3.0.9]
    ... 12 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:391) ~[spring-kafka-3.0.9.jar:3.0.9]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.9.jar:3.0.9]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.9.jar:3.0.9]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2923) ~[spring-kafka-3.0.9.jar:3.0.9]
Caused by: java.lang.NullPointerException: Cannot invoke "by.ITAcademy.AuditMicroService.models.dto.UserAudit.getUuid()" because the return value of "by.ITAcademy.AuditMicroService.models.dto.UserServiceEventDto.getUserAudit()" is null
    at by.ITAcademy.AuditMicroService.service.impl.KafkaUserMicroServiceEventProcessor.process(KafkaUserMicroServiceEventProcessor.java:30) ~[classes/:na]
    at by.ITAcademy.AuditMicroService.service.impl.KafkaUserMicroServiceEventProcessor.process(KafkaUserMicroServiceEventProcessor.java:14) ~[classes/:na]
    at by.ITAcademy.AuditMicroService.service.kafkaListeners.KafkaMainListenerService.listenForEvent2(KafkaMainListenerService.java:65) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.11.jar:6.0.11]
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:167) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:66) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.9.jar:3.0.9]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2923) ~[spring-kafka-3.0.9.jar:3.0.9]
    ... 14 common frames omitted
ldioqlga

ldioqlga1#

这种逻辑是错误的。@KafkaHandler与主题无关。当我们已经通过@KafkaListener使用了主题中的数据时,这确实是一种简单的路由机制。因此,@RetryableTopic@KafkaListener完全绑定,但不是其内部逻辑。因此,您需要修改配置,将@RetryableTopic@KafkaHandler中移除。

相关问题