spring云流kafka暂停/恢复绑定

8hhllhi2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(401)

我们使用springcloudestream2.0和kafka作为消息代理。
我们已经实现了一个断路器,在目标系统(db或第三方api)不可用的情况下停止应用程序上下文,正如这里所建议的:在目标系统关闭时停止spring cloud stream@streamlistener侦听
现在在SpringCloudStream2.0中,有一种方法可以使用启动器管理绑定器的生命周期:绑定可视化和控制
有没有可能从代码控制活页夹的生命周期,也就是在目标服务器关闭的情况下,控制活页夹的生命周期 pause 活页夹,当它起来的时候 resume ?

daupos2t

daupos2t1#

对不起,我误解了你的问题。
你可以自动接线 BindingsEndpoint 但不幸的是 State 枚举是私有的,因此您不能调用 changeState() 以编程方式。
我已经为此打开了一个问题。
编辑
你可以用反射来做,但是有点难看。。。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So53476384Application {

    public static void main(String[] args) {
        SpringApplication.run(So53476384Application.class, args);
    }

    @Autowired
    BindingsEndpoint binding;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Class<?> clazz = ClassUtils.forName("org.springframework.cloud.stream.endpoint.BindingsEndpoint$State",
                    So53476384Application.class.getClassLoader());
            ReflectionUtils.doWithMethods(BindingsEndpoint.class, method -> {
                try {
                    method.invoke(this.binding, "input", clazz.getEnumConstants()[2]); // PAUSE
                }
                catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }, method -> method.getName().equals("changeState"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

相关问题