我有一个springboot(v.1.57)应用程序,它使用springcloudstream(v1.3.0)和kafka(v1.1.6)。我希望能够优雅地关闭它,即在关闭时,所有流侦听器(即,用@streamlistener注解)应该:
停止轮询新邮件
完成他们的工作
将偏移量提交给Kafka
我注意到containerproperties中有一个名为“shutdowntimeout”的属性(设置为默认值10000ms),因此我尝试通过如下反射扩展ConcurrentKafkalListenerContainerFactoryConfigurer类(因为它有一个@conditionalonmissingbean注解)将其修改为30000:
@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {
@Autowired
private KafkaProperties kproperties;
@Override
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
myAccessor.setPropertyValue("properties", kproperties);
ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();
super.configure(listenerContainerFactory, consumerFactory);
containerProperties.setShutdownTimeout(30000);
}
}
但没有成功。还尝试将它(shutdowntimeout:30000)放在application.yml的spring cloud stream binder设置下,但同样没有帮助。
有什么方法可以控制关闭过程并实现我的目标吗?
1条答案
按热度按时间64jmpszr1#
编辑
不再需要做这种反射黑客;只需添加一个
ListenerContainerCustomizer
@Bean
应用程序上下文。看这里。编辑\u结束
不再支持spring kafka 1.1.x;您应该在boot 1.5.x中使用1.3.9。
当前的boot 1.5.x版本是1.5.21。
你应该马上升级。
然而,所有这些项目都有一个更新的版本。
springcloudstream不使用工厂或引导属性来创建容器;它不公开在容器上配置该属性的机制。
Spring Cloud流2.1增加了
ListenerContainerCustomizer
它允许您通过设置绑定容器上的任何属性来自定义绑定容器。我建议您升级到boot2.1.6和springcloudstreamgermantown(2.2.0)。
编辑
这是一个有点黑客,但它应该工作,直到你可以升级到一个较新的流版本。。。