手动提交失败后使用相同的记录

eh57zj3b  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(454)

我有一个 KafkaConsumer 它消耗消息,进行一些处理,然后 KafkaProducer 将消息发送到另一个主题。我目前正在使用手动提交偏移量 Acknowledge.acknowledge() 什么时候 KafkaProducer 成功地将消息发送到另一个主题,但我不呼叫 Acknowledge.acknowledge() 当消息无法发送时。我已经准备好了 ackModeAckMode.MANUAL_IMMEDIATE 但是,当我不手动提交偏移量时, KafkaConsumer 应选择未能处理的同一条记录(即未能发送到另一个主题),但即使失败,偏移量也会增加,并处理下一条记录。有人能告诉我为什么会这样吗?我怎样才能做到这一点?
Kafka消费者.java

@Autowired
private KafkaProducer kafkaProducer;    

@KafkaListener(id = "workerListener", topics = "${kafka.topic.name}",
        containerFactory = "workerKafkaListenerContainerFactory")
public void workerListener(ConsumerRecord<?,?> consumerRecord, Acknowledgment ack) {

    // Do something! Process consumer record

    // Now producer will send to another topic
    kafkaProducer.sendNotification(notification, ack);
}

kafkaproducer.java文件

...
...

public void sendNotification(String notification, Acknowledgment ack) {

    ListenableFuture<SendResult<String, String>> future = 
            notificationKafkaTemplate.send(topicName, String);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            handleNotificationSuccess();
            ack.acknowledge();
        }

        @Override
        public void onFailure(Throwable ex) {
            handleNotificationFailure(ex);
        }

    });
}

public void handleNotificationSuccess() {
    // handle notification success
}

public void handleNotificationFailure(Throwable ex) {
    // handle notification failure
}

如果需要更多的信息,请告诉我。谢谢
编辑1:
我开始实现寻找一个特定的补偿,但遇到了一个问题。代码如下:

@Component
public class KafkaConsumer implements ConsumerSeekAware {

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(id = "workerListener", topics = "${kafka.topic.name}",
        containerFactory = "workerKafkaListenerContainerFactory")
    public void workerListener(ConsumerRecord<?,?> consumerRecord, Acknowledgment ack) {

        this.seekCallBack().get().seek(consumerRecord.topic(), consumerRecord.partition(), 0);

        // Now producer will send to another topic
        kafkaProducer.sendNotification(notification, ack);
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallBack.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback csc) {
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback csc) {
    }

}

我好像不明白问题出在哪里。以下是堆栈跟踪:

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2017-12-04 23:15:18,670 ERROR o.s.b.SpringApplication - Application startup failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaConsumer' defined in class path resource [com/tgss/mdm/worker/consumer/kafkaods/config/AppConfig.class]: Initialization of bean failed; nested exception is java.lang.IllegalStateException: @KafkaListener method 'workerListener' found on bean target class 'KafkaConsumer', but not found in any interface(s) for bean JDK proxy. Either pull the method up to an interface or switch to subclass (CGLIB) proxies by setting proxy-target-class/proxyTargetClass attribute to 'true'
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:564)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151)
    at com.tgss.mdm.worker.consumer.kafkaods.KafkaApplication.main(KafkaApplication.java:10)
Caused by: java.lang.IllegalStateException: @KafkaListener method 'workerListener' found on bean target class 'KafkaConsumer', but not found in any interface(s) for bean JDK proxy. Either pull the method up to an interface or switch to subclass (CGLIB) proxies by setting proxy-target-class/proxyTargetClass attribute to 'true'
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.checkProxy(KafkaListenerAnnotationBeanPostProcessor.java:373)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:341)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:279)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:423)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1633)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
    ... 15 common frames omitted
Caused by: java.lang.NoSuchMethodException: com.sun.proxy.$Proxy79.workerListener(org.apache.kafka.clients.consumer.ConsumerRecord, org.springframework.kafka.support.Acknowledgment)
    at java.lang.Class.getMethod(Class.java:1786)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.checkProxy(KafkaListenerAnnotationBeanPostProcessor.java:358)
    ... 20 common frames omitted
mkh04yzy

mkh04yzy1#

Caused by: java.lang.IllegalStateException: @KafkaListener method 'workerListener' found on bean target class 'KafkaConsumer', but not found in any interface(s) for bean JDK proxy. Either pull the method up to an interface or switch to subclass (CGLIB) proxies by setting proxy-target-class/proxyTargetClass attribute to 'true'
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.checkProxy(KafkaListenerAnnotationBeanPostProcessor.java:373)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:341)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:279)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:423)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1633)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
    ... 15 common frames omitted
Caused by: java.lang.NoSuchMethodException: com.sun.proxy.$Proxy79.workerListener(org.apache.kafka.clients.consumer.ConsumerRecord, org.springframework.kafka.support.Acknowledgment)

我建议遵循建议:或者使用cglib proxyTargetClass = true ,或使用适当的方法提取接口以使jdk代理正常工作。

相关问题