我是新的 Camel Kafka组件。我使用camel-kafka组件从kafka服务器发送和接收消息。我使用类似的代码,如下所述:
from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String messageKey = "";
if (exchange.getIn() != null) {
Message message = exchange.getIn();
Integer partitionId = (Integer) message
.getHeader(KafkaConstants.PARTITION);
String topicName = (String) message
.getHeader(KafkaConstants.TOPIC);
if (message.getHeader(KafkaConstants.KEY) != null)
messageKey = (String) message
.getHeader(KafkaConstants.KEY);
Object data = message.getBody();
System.out.println("topicName :: "
+ topicName + " partitionId :: "
+ partitionId + " messageKey :: "
+ messageKey + " message :: "
+ data + "\n");
/// I perform many other operations here like persist the object in DB etc.
}
}
});
这里的问题是,因为我没有发送任何确认回Kafka它是从服务器收到相同的消息三次。我的问题是,我怎样才能手动将应答发送回Kafka?我在camel kafka组件中没有找到任何合适的文档。
1条答案
按热度按时间of1yzvn41#
你不需要给Kafka回信。您还没有指定自动提交启用,它默认为true,这意味着自动提交设置为true。这就需要确认了。
您能否指定camel kafka组件使用哪个版本的kafka?你的 Camel Kafka设置很好。您可能会收到重复的消息,因为您使用的是Kafka的版本。