boot-kafka消费者应用程序实现heartbeat

7cjasjjr  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(301)

下面是我的SpringBootKafka消费应用程序从kafka读取数据的主题。在这个应用程序中,我们计划实现heartbeat函数,使用@schduling注解将它的heartbeat发布到url,以了解它的活动和运行(它将我的json输入数据加载到db)。此post请求的目的是更新应用程序监视工具的状态。
为了实现这一点,我将心跳代码放在应用程序的许多地方,但我无法实现这一点,因为@postconstuct或consumer.poll()不允许运行心跳代码段。
我们使用的是ApacheKafka2.12,在我的SpringBoot应用程序中实现这种行为的正确方法是什么?他们有没有其他api来做这样的发帖请求到url,每隔几分钟就通过应用程序。?写后台线程可以解决这个问题,请大家分享一下?为什么postconstuct()或poll()会阻止其他重复代码运行。请帮帮我。提前谢谢。

@SpringBootApplication
@EnableScheduling 
public class KafkaApp {

    @Autowired
    ConsumerService kcService;

    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }

    @PostConstruct
    public void init(){
        kcService.getMessagesFromKafka();
    }   
}

和2@服务定义:

import org.apache.kafka.clients.consumer.Consumer;

@Service public class ConsumerService { 
    final Consumer<Long, String> consumer = createConsumer();
    final int giveUp = 100; 
    int noRecordsCount = 0;

    while (true) {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
        if (consumerRecords.count()==0) { 
            noRecordsCount++; 
            if (noRecordsCount > giveUp) break; 
            else continue;
            }     
            consumerRecords.forEach(record -> { 
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
              record.key(), record.value(),
              record.partition(), record.offset()); 
              }); 
              consumer.commitAsync();
              } 
    }
@Scheduled(fixedDelay = 180000)
        public void heartbeat() {
            RestTemplate restTemplate = new RestTemplate();
            String url = "endpoint url";
            String requestJson = "{\"I am alive\":\"App name?\"}";
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);    
            HttpEntity<String> entity = new HttpEntity<String>(requestJson,headers);
            String answer = restTemplate.postForObject(url, entity, String.class);
            System.out.println(answer);
    }
rlcwz9us

rlcwz9us1#

向主类添加注解,如:

@SpringBootApplication
@EnableScheduling 
public class KafkaApp {
    @Autowired
    ConsumerService kcService;
    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }
    @PostConstruct
    public void init(){
        kcService.getMessagesFromKafka();
    }   

}

有关更多详细信息,请访问以下链接:
如果您想为此目的编写cron作业,那么 application.properties 添加以下内容:

cron.expression=5 0 0 ? * * *   //Its means it'll execute every 5 sec

你可以让cron表达式在线这里是一个link:cron-expression-generator-quartz.
在你的心跳函数中写下上面的函数:

@Scheduled(cron = "${cron.expression}")
public void heartbeat() {
    //Your code here.
}

相关问题