在camel kafka组件中向kafka发送确认

ldxq2e6h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(510)

我是新的 Camel Kafka组件。我使用camel-kafka组件从kafka服务器发送和接收消息。我使用类似的代码,如下所述:

  1. from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
  2. .process(new Processor() {
  3. @Override
  4. public void process(Exchange exchange)
  5. throws Exception {
  6. String messageKey = "";
  7. if (exchange.getIn() != null) {
  8. Message message = exchange.getIn();
  9. Integer partitionId = (Integer) message
  10. .getHeader(KafkaConstants.PARTITION);
  11. String topicName = (String) message
  12. .getHeader(KafkaConstants.TOPIC);
  13. if (message.getHeader(KafkaConstants.KEY) != null)
  14. messageKey = (String) message
  15. .getHeader(KafkaConstants.KEY);
  16. Object data = message.getBody();
  17. System.out.println("topicName :: "
  18. + topicName + " partitionId :: "
  19. + partitionId + " messageKey :: "
  20. + messageKey + " message :: "
  21. + data + "\n");
  22. /// I perform many other operations here like persist the object in DB etc.
  23. }
  24. }
  25. });

这里的问题是,因为我没有发送任何确认回Kafka它是从服务器收到相同的消息三次。我的问题是,我怎样才能手动将应答发送回Kafka?我在camel kafka组件中没有找到任何合适的文档。

of1yzvn4

of1yzvn41#

你不需要给Kafka回信。您还没有指定自动提交启用,它默认为true,这意味着自动提交设置为true。这就需要确认了。
您能否指定camel kafka组件使用哪个版本的kafka?你的 Camel Kafka设置很好。您可能会收到重复的消息,因为您使用的是Kafka的版本。

相关问题