在camel kafka组件中向kafka发送确认

ldxq2e6h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(463)

我是新的 Camel Kafka组件。我使用camel-kafka组件从kafka服务器发送和接收消息。我使用类似的代码,如下所述:

from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange)
                                    throws Exception {
                                String messageKey = "";
                                if (exchange.getIn() != null) {
                                    Message message = exchange.getIn();
                                    Integer partitionId = (Integer) message
                                            .getHeader(KafkaConstants.PARTITION);
                                    String topicName = (String) message
                                            .getHeader(KafkaConstants.TOPIC);
                                    if (message.getHeader(KafkaConstants.KEY) != null)
                                        messageKey = (String) message
                                                .getHeader(KafkaConstants.KEY);
                                    Object data = message.getBody();

                                    System.out.println("topicName :: "
                                            + topicName + " partitionId :: "
                                            + partitionId + " messageKey :: "
                                            + messageKey + " message :: "
                                            + data + "\n");
   /// I perform many other operations here like persist the object in DB etc. 
                                }
                            }
                        });

这里的问题是,因为我没有发送任何确认回Kafka它是从服务器收到相同的消息三次。我的问题是,我怎样才能手动将应答发送回Kafka?我在camel kafka组件中没有找到任何合适的文档。

of1yzvn4

of1yzvn41#

你不需要给Kafka回信。您还没有指定自动提交启用,它默认为true,这意味着自动提交设置为true。这就需要确认了。
您能否指定camel kafka组件使用哪个版本的kafka?你的 Camel Kafka设置很好。您可能会收到重复的消息,因为您使用的是Kafka的版本。

相关问题