如何在Kafka中跟踪未使用的消息

f87krz0w  于 12个月前  发布在  Apache
关注(0)|答案(2)|浏览(128)

我有一个生产者,生产特定主题的消息:
制作人:

// Produce a message to a Kafka topic
        String topic = "dev.topic.proxy";
        String key = "some-key";
        String value = "Hello, Kafka!";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        while (true) {
            producer.send(record);
            System.out.println("Message has been sent");
        }

字符串

消费者:

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="readMsg">
        <description>Route to refresh the proxy routes</description>
       <from uri="appKafkaConsumer:dev.topic.proxy" />
        <log message="Received message with headers: ${headers}" loggingLevel="INFO" />
        <delay>
        <constant>5000</constant>
    </delay>
    </route>
</routes>


这是camel中的消费者。groupId是在我启动应用程序时随机自动生成的。我停止了应用程序,当我重新启动应用程序时,由于分配了新的groupId,许多消息丢失。我不想丢失消息。什么是最好的解决方法?

neskvpey

neskvpey1#

请确保将Kafka消费者配置为从最早的偏移量而不是最新的偏移量开始消费。
请注意,这将导致消费者重新处理它已经收到的消息。为了让特定消费者只处理一次消息,它必须保持相同的消费者组ID。
如果您希望所有消息都被处理一次,并且在所有消费者中只处理一次,那么我建议您使用ActiveMQ或类似的解决方案,而不是Kafka。

p8h8hvxi

p8h8hvxi2#

我认为你可以为消费者启用自动偏移提交,这意味着消费者将定期将其进度提交给Kafka。这确保了即使消费者崩溃或重新启动,它也可以从提交的偏移恢复其位置并恢复处理,而无需重新处理消息。

相关问题