下面是我的SpringBootKafka消费应用程序从kafka读取数据的主题。在这个应用程序中,我们计划实现heartbeat函数,使用@schduling注解将它的heartbeat发布到url,以了解它的活动和运行(它将我的json输入数据加载到db)。此post请求的目的是更新应用程序监视工具的状态。
为了实现这一点,我将心跳代码放在应用程序的许多地方,但我无法实现这一点,因为@postconstuct或consumer.poll()不允许运行心跳代码段。
我们使用的是ApacheKafka2.12,在我的SpringBoot应用程序中实现这种行为的正确方法是什么?他们有没有其他api来做这样的发帖请求到url,每隔几分钟就通过应用程序。?写后台线程可以解决这个问题,请大家分享一下?为什么postconstuct()或poll()会阻止其他重复代码运行。请帮帮我。提前谢谢。
@SpringBootApplication
@EnableScheduling
public class KafkaApp {
@Autowired
ConsumerService kcService;
public static void main(String[] args) {
SpringApplication.run(KafkaApp.class, args);
}
@PostConstruct
public void init(){
kcService.getMessagesFromKafka();
}
}
和2@服务定义:
import org.apache.kafka.clients.consumer.Consumer;
@Service public class ConsumerService {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
}
@Scheduled(fixedDelay = 180000)
public void heartbeat() {
RestTemplate restTemplate = new RestTemplate();
String url = "endpoint url";
String requestJson = "{\"I am alive\":\"App name?\"}";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<String>(requestJson,headers);
String answer = restTemplate.postForObject(url, entity, String.class);
System.out.println(answer);
}
1条答案
按热度按时间rlcwz9us1#
向主类添加注解,如:
有关更多详细信息,请访问以下链接:
如果您想为此目的编写cron作业,那么
application.properties
添加以下内容:你可以让cron表达式在线这里是一个link:cron-expression-generator-quartz.
在你的心跳函数中写下上面的函数: