在spring boot中暂停/启动kafka流处理器

k10s72fa  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(762)

我要为消息实现一个断路器模式。基本要求是,如果microservice不能将消息发布到发布主题,它应该停止接受来自其他kafka主题的消息。当发布主题可用时,microservice应该开始接受来自其他kafka主题的消息。
有没有一种方法可以实现这一点在 Spring 启动Kafka流?

wvyml7n5

wvyml7n51#

我可以通过使用 BindingsEndpoint .

private final BindingsEndpoint binding;

@Override
public void stop() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Stopping Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::stop);
        log.info("Stopped Kafka topics ");
    }
}

@Override
public void start() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Starting Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::start);
        log.info("Started Kafka topics ");
    }
}

protected List<Binding> getBindings(List<?> objects) {
    return objects.stream().filter(object -> object instanceof Binding)
            .map(object -> (Binding) object).collect(Collectors.toList());
}

相关问题