我要为消息实现一个断路器模式。基本要求是,如果microservice不能将消息发布到发布主题,它应该停止接受来自其他kafka主题的消息。当发布主题可用时,microservice应该开始接受来自其他kafka主题的消息。有没有一种方法可以实现这一点在 Spring 启动Kafka流?
wvyml7n51#
我可以通过使用 BindingsEndpoint .
BindingsEndpoint
private final BindingsEndpoint binding; @Override public void stop() { List<?> objects = binding.queryStates(); if (!objects.isEmpty()) { log.info("Stopping Kafka topics "); List<Binding> bindings = getBindings(objects); bindings.forEach(Binding::stop); log.info("Stopped Kafka topics "); } } @Override public void start() { List<?> objects = binding.queryStates(); if (!objects.isEmpty()) { log.info("Starting Kafka topics "); List<Binding> bindings = getBindings(objects); bindings.forEach(Binding::start); log.info("Started Kafka topics "); } } protected List<Binding> getBindings(List<?> objects) { return objects.stream().filter(object -> object instanceof Binding) .map(object -> (Binding) object).collect(Collectors.toList()); }
1条答案
按热度按时间wvyml7n51#
我可以通过使用
BindingsEndpoint
.