如何在使用带有kafka的springcloudstream时执行优雅的应用程序关闭?

zour9fqk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(517)

我有一个springboot(v.1.57)应用程序,它使用springcloudstream(v1.3.0)和kafka(v1.1.6)。我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即,用@streamlistener注解)应该:
停止轮询新邮件
完成他们的工作
将偏移量提交给Kafka
我注意到containerproperties中有一个名为“shutdowntimeout”的属性(设置为默认值10000ms),因此我尝试通过如下反射扩展ConcurrentKafkalListenerContainerFactoryConfigurer类(因为它有一个@conditionalonmissingbean注解)将其修改为30000:

@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {

    @Autowired
    private KafkaProperties kproperties;

    @Override
    public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
                          ConsumerFactory<Object, Object> consumerFactory) {
        PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
        myAccessor.setPropertyValue("properties", kproperties);

        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        super.configure(listenerContainerFactory, consumerFactory);
        containerProperties.setShutdownTimeout(30000);
    }
}

但没有成功。还尝试将它(shutdowntimeout:30000)放在application.yml的spring cloud stream binder设置下,但同样没有帮助。
有什么方法可以控制关闭过程并实现我的目标吗?

64jmpszr

64jmpszr1#

编辑
不再需要做这种反射黑客;只需添加一个
ListenerContainerCustomizer @Bean 应用程序上下文。看这里。
编辑\u结束
不再支持spring kafka 1.1.x;您应该在boot 1.5.x中使用1.3.9。
当前的boot 1.5.x版本是1.5.21。
你应该马上升级。
然而,所有这些项目都有一个更新的版本。
springcloudstream不使用工厂或引导属性来创建容器;它不公开在容器上配置该属性的机制。
Spring Cloud流2.1增加了 ListenerContainerCustomizer 它允许您通过设置绑定容器上的任何属性来自定义绑定容器。
我建议您升级到boot2.1.6和springcloudstreamgermantown(2.2.0)。
编辑
这是一个有点黑客,但它应该工作,直到你可以升级到一个较新的流版本。。。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {

    public static void main(String[] args) {
        SpringApplication.run(So56883620Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        this.latch.countDown();
        System.out.println(in);
        Thread.sleep(6_000);
        System.out.println("exiting");
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
            // wait for listener to start
            this.latch.await(10, TimeUnit.SECONDS);
            System.out.println("Shutting down");
        };
    }

    @Bean
    public SmartLifecycle bindingFixer(BindingService bindingService) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MAX_VALUE;
            }

            @Override
            public void stop() {
                // no op
            }

            @Override
            public void start() {
                @SuppressWarnings("unchecked")
                Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
                        .getPropertyValue("consumerBindings");
                @SuppressWarnings("unchecked")
                Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
                ((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
                        .getPropertyValue("lifecycle.messageListenerContainer"))
                                .getContainerProperties().setShutdownTimeout(30_000L);
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
                callback.run();
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };
    }

}

相关问题