java—在读取主题之后异步提交消息

oxcyiej7  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(393)

我正试图在读完主题后提交一条消息。我跟踪了这个链接(https://www.confluent.io/blog/apache-kafka-spring-boot-application)创造一个Kafka消费者与Spring。通常情况下,它工作得很好,消费者得到消息并等待另一个人进入队列。但问题是,当我处理这些消息时,需要花费大量的时间(大约10分钟),kafka队列认为消息没有被消费(提交),消费者会反复阅读。我不得不说,当我的处理时间少于5分钟时,它工作得很好,但当它持续的时间更长时,它不会提交消息。
我四处寻找了一些答案,但这对我没有帮助,因为我使用的源代码不一样(当然还有不同的结构)。我尝试过发送异步方法,也尝试过异步提交消息,但是失败了。其中一些来源是:
spring boot kafka:无法完成提交,因为组已重新平衡
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
kafka 0.10 java使用者未阅读主题中的消息
https://github.com/confluentinc/confluent-kafka-dotnet/issues/470
主要课程如下:

@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }

consumer类(我需要在其中提交消息)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }

如何在从队列中读取消息之后提交它。
我想确保当我收到消息时,我会立即提交消息。现在,当我在(system.out.println)之后执行完方法时,消息被提交。有人能告诉我怎么做吗?
-----更新-------
很抱歉回复太晚,但正如@girishb所说,我一直在寻找girishb的配置,但我不知道在哪里可以定义我想从配置文件(applications.yml)读取/侦听的主题。我看到的所有示例都使用类似的结构(http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html). 是否有任何选项可以让我读取在其他服务器中声明的主题?使用类似于这个@kafkalistener的东西(topics=“${app.topic.pro}”,groupid=“group\u id”)
=============解决方案1========================================
我遵循了@victor gallet的建议,并在consume方法中包含了confumer属性的声明,以便容纳“acknowledgement”对象。我也关注过这个链接(https://www.programcreek.com/java-api-examples/?code=springoneplatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/commonconfiguration.java)以获取我用于声明和设置所有属性(consumerproperties、consumerfactory、,kafkalistenercontainerfactory)。我发现的唯一问题是“new seektocurrenterrorhandler()”声明,因为我遇到了一个错误,目前我还无法解决它(如果有人向我解释就好了)。

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;

   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

     @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 
        return props;
    }

    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
        acknowledgment.acknowledge();// commit immediately
        Properties  props=prop.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }
fivyi3re

fivyi3re1#

你可以使用 java.util.concurrent.BlockingQueue 在消费和提交kafka偏移量时推送消息。然后使用另一个线程从blockingqueue和进程获取消息。这样您就不必等到处理完成。

c8ib6hqw

c8ib6hqw2#

必须使用属性修改使用者配置 enable.auto.commit 设置为假:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后,您必须修改springkafka侦听器工厂并将ack模式设置为 MANUAL_IMMEDIATE . 下面是一个 ConcurrentKafkaListenerContainerFactory :

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

如文件所述, MANUAL_IMMEDIATE 意思是:当侦听器调用acknowledgement.acknowledge()方法时,立即提交偏移量。
您可以在这里找到所有提交方法。
然后,在侦听器代码中,可以通过添加 Acknowledgment 对象,例如:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
   // commit immediately
    acknowledgment.acknowledge();
}
uttx8gqw

uttx8gqw3#

properties.put(consumerconfig.enable\u auto\u commit\u config,false);
在设置了上述属性之后,如果您想成批处理,那么您可以遵循以下配置。

factory.getContainerProperties().setAckMode(AckMode.MANUAL);

//您可以设置manual或manual\u immediate,因为//kafkamessagelistenercontainer为任何类型的手动确认模式调用//consumerbatchackknowledgement

factory.getContainerProperties().setAckOnError(true);
    //specifying batch error handler because i have enabled to listen records in batch
    factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
    factory.setBatchListener(true);
    factory.getContainerProperties().setSyncCommits(false);

相关问题