spring kafka:如何在执行seek()之后丢弃poll()已经检索到的消息?

u3r8eeie  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(431)

这是一个后续问题-阅读Kafka的同一信息多次。如果有更好的方式问这个问题而不张贴新的问题,让我知道。在这篇文章中,加里提到
“但如果以后的邮件已经被检索到,您仍将首先看到这些邮件,因此您也必须丢弃这些邮件。”
调用seek()后,有没有一种干净的方法来丢弃poll()已经读取的消息?我通过保存初始偏移量(在recordoffset中)并在成功时增加它来实现这个逻辑。失败时,我调用seek()并将recordoffset的值设置为record.offset()。然后,对于每个新消息,我检查record.offset()是否大于recordoffset。如果是,我只需调用acknowledge(),从而“丢弃”所有先前读取的消息。这是密码-

// in onMessage()...
    if (record.offset() > recordOffset){
        acknowledgment.acknowledge();
        return;
    }

    try {
        processRecord(record);
        recordOffset = record.offset()+1;
        acknowledgment.acknowledge();
    } catch (Exception e) {
        recordOffset = record.offset();
        consumerSeekCallback.seek(record.topic(), record.partition(), record.offset());
    }

这种方法的问题是,多个分区会使它变得复杂。有更简单/更干净的方法吗?
编辑1基于gary的建议,我尝试添加一个类似这样的错误处理程序-

@KafkaListener(topicPartitions =
        {@org.springframework.kafka.annotation.TopicPartition(topic = "${kafka.consumer.topic}", partitions = { "1" })},
        errorHandler = "SeekToCurrentErrorHandler")

当我得到“cannot resolve method'errorhandler'”时,这个语法有什么问题吗?
Edit2在gary解释了这两个错误处理程序之后,我删除了上面的错误处理程序,并将下面的内容添加到配置文件中-

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProps()));
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

当我启动应用程序,我得到这个错误现在。。。
java.lang.nosuchmethoderror:org.springframework.util.assert.state(zljava/util/function/supplier;)v位于org.springframework.kafka.listener.adapter.messagingmessagelisteneradapter.determineinferredtype(messagingmessagelisteneradapter。java:396)
这是396号线-

Assert.state(!this.isConsumerRecordList || validParametersForBatch,
            () -> String.format(stateMessage, "ConsumerRecord"));
Assert.state(!this.isMessageList || validParametersForBatch,
            () -> String.format(stateMessage, "Message<?>"));
x9ybnkn6

x9ybnkn61#

你走的是正确的道路,是的,你必须处理不同的分区以及。有一个 FilteringMessageListenerAdapter ,但你还是要写逻辑。

xpcnnkqh

xpcnnkqh2#

从版本2.0.1开始,如果容器 ErrorHandler 是一个 RemainingRecordsErrorHandler ,例如 SeekToCurrentErrorHandler ,其余的记录(包括失败的记录)将被发送到错误处理程序而不是侦听器。
这允许 SeekToCurrentErrorHandler 重新定位每个分区,以便下次轮询将返回未处理的记录。

/**
 * An error handler that seeks to the current offset for each topic in the remaining
 * records. Used to rewind partitions after a message failure so that it can be
 * replayed.
 *
 * @author Gary Russell
 * @since 2.0.1
 *
 */
public class SeekToCurrentErrorHandler implements RemainingRecordsErrorHandler

编辑
有两种类型的错误处理程序。这个 KafkaListenerErrorHandler (在注解中指定)在侦听器级别工作;它连接到调用 @KafkaListener 注解,因此只能访问当前记录。
第二个错误处理程序(在侦听器容器上配置)在容器级别工作,因此可以访问其余的记录。这个 SeekToCurrentErrorHandler 是容器级错误处理程序。
它是在容器工厂的容器属性上配置的。。。

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(this.consumerFactory);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    return factory;
}

相关问题