我有一个生产者,生产特定主题的消息:
制作人:
// Produce a message to a Kafka topic
String topic = "dev.topic.proxy";
String key = "some-key";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
while (true) {
producer.send(record);
System.out.println("Message has been sent");
}
字符串
消费者:
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="readMsg">
<description>Route to refresh the proxy routes</description>
<from uri="appKafkaConsumer:dev.topic.proxy" />
<log message="Received message with headers: ${headers}" loggingLevel="INFO" />
<delay>
<constant>5000</constant>
</delay>
</route>
</routes>
型
这是camel中的消费者。groupId是在我启动应用程序时随机自动生成的。我停止了应用程序,当我重新启动应用程序时,由于分配了新的groupId,许多消息丢失。我不想丢失消息。什么是最好的解决方法?
2条答案
按热度按时间neskvpey1#
请确保将Kafka消费者配置为从最早的偏移量而不是最新的偏移量开始消费。
请注意,这将导致消费者重新处理它已经收到的消息。为了让特定消费者只处理一次消息,它必须保持相同的消费者组ID。
如果您希望所有消息都被处理一次,并且在所有消费者中只处理一次,那么我建议您使用ActiveMQ或类似的解决方案,而不是Kafka。
p8h8hvxi2#
我认为你可以为消费者启用自动偏移提交,这意味着消费者将定期将其进度提交给Kafka。这确保了即使消费者崩溃或重新启动,它也可以从提交的偏移恢复其位置并恢复处理,而无需重新处理消息。