ApacheKafka-出错时是否可能丢失消息?

mrwjdhj3  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(567)

我正在用spring cloud stream挖掘apache kafka,观察到一些行为,这些行为让我怀疑我是否做错了什么,或者它是否按预期工作——我几乎不怀疑:
出错时可能会丢失消息!?
我的设置尽可能简单。一个kafka代理和一个只有一个分区的主题。具有默认设置的代理、主题、生产者和使用者(auto ack为true)。
测试用例1
生产 message1 生产 message2 启动一个使用者,在接收到任何消息时抛出runtimeexception
消费 message1 ,重试
消费 message1 ,重试
消费 message1 ,重试
引发异常
消费 message2 ,重试
消费 message2 ,重试
消费 message2 ,重试
引发异常
停止并重新启动耗电元件
消费 message1 ,重试
消费 message1 ,重试
消费 message1 ,重试
引发异常
消费 message2 ,重试
消费 message2 ,重试
消费 message2 ,重试
引发异常
按预期工作。
测试用例2
生产 message1 生产 message2 启动一个使用者,该使用者将在接收时抛出runtimeexception message1 消费 message1 ,重试
消费 message1 ,重试
消费 message1 ,重试
引发异常
成功消费 message2 生产 message3 成功消费 message3 停止并重新启动耗电元件
什么都不发生,消费者等待新的消息来消费 message1 将被跳过,因为提交的偏移量已设置为 message3 . 这就是困扰我的。我不想消费者继续与消息,只要以前的消息没有成功处理。
有没有人经历过同样的行为和/或可以指导我如何改变这种状况?
提前谢谢!
更新:根据要求,一些代码片段
创建主题

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

连接生产商

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

创建maven项目

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.7.RELEASE</version>
    <relativePath/>
</parent>

...

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

添加以下内容 application.yml ```
spring:
cloud:
stream:
bindings:
input:
destination: test-topic
contentType: text/plain
group: test-group
consumer:
header-mode: raw
kafka:
binder:
zkNodes: localhost:2181
brokers: localhost:9092

添加以下内容 `Application.java` ```
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    private void consume(Message<String> message) {
        log.info("Received: {}", message.getPayload());
        if ("message1".equals(message.getPayload())
            throw new RuntimeException();
        log.info("Successfully processed message {}", message.getPayload());
    }
}

应该是这样。运行应用程序并使用控制台生产者生成消息。

h9a6wy2h

h9a6wy2h1#

您应该为这种情况配置dlq。如果您的邮件在重试3次后无法被使用,则很可能根本不会被使用,或者需要特殊处理。设置一个dlq,在那里有毒的消息可以降落,你不会失去消息

s1ag04yj

s1ag04yj2#

Kafka给你一个运行时,但你有选择的权力。在某些情况下,msg可能会丢失/跳过,在某些情况下可能不会-您需要根据需要准备配置。在我看来,您应该进一步研究一些spring云流设置。您还可以使用禁用自动提交和“手动”提交偏移量。

nszi6y05

nszi6y053#

在kafka中,每条消息都有一个偏移量id。您的使用者应用程序可以检查偏移量,以及是否跳过或错过任何偏移量,而不是使用下一条消息。可以使用consumer.seek方法获取缺少的特定消息。
偏移量本质上是递增的,并且是连续的。
在你的情况下使用手动提交。
我可以说使用以下步骤。。
在poll方法之后,首先检查先前提交的偏移量并请求下一个偏移量值
消息消费和处理成功后,将成功处理的消息的偏移量值保存在某些内存或表中。下次投票时
下面的链接将不服务于您的用例,但您可以得到公平的想法
参考示例

相关问题