我们在spring-boot应用程序中使用kafka消费者。创建bean时,将创建kafkalistenerendpointregistry,并尝试获取容器的数据。现在的问题是,我们有时会收到一个timeoutexception,它会停止应用程序的启动。请参阅完整堆栈跟踪。
2019-08-28 07:49:46,053 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.web.servlet.support.SpringBootServletInitializer.run(SpringBootServletInitializer.java:151)
at org.springframework.boot.web.servlet.support.SpringBootServletInitializer.createRootApplicationContext(SpringBootServletInitializer.java:131)
at com.pack.MyApplication.createRootApplicationContext(MyApplication.java:45)
at org.springframework.boot.web.servlet.support.SpringBootServletInitializer.onStartup(SpringBootServletInitializer.java:91)
at org.springframework.web.SpringServletContainerInitializer.onStartup(SpringServletContainerInitializer.java:171)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5132)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:717)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:690)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:705)
at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:978)
at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1849)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.tomcat.util.threads.InlineExecutorService.execute(InlineExecutorService.java:75)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at org.apache.catalina.startup.HostConfig.deployWARs(HostConfig.java:773)
at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:427)
at org.apache.catalina.startup.HostConfig.start(HostConfig.java:1576)
at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:309)
at org.apache.catalina.util.LifecycleBase.fireLifecycleEvent(LifecycleBase.java:123)
at org.apache.catalina.util.LifecycleBase.setStateInternal(LifecycleBase.java:423)
at org.apache.catalina.util.LifecycleBase.setState(LifecycleBase.java:366)
at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:936)
at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:841)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1384)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1374)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.tomcat.util.threads.InlineExecutorService.execute(InlineExecutorService.java:75)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:909)
at org.apache.catalina.core.StandardEngine.startInternal(StandardEngine.java:262)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
at org.apache.catalina.core.StandardService.startInternal(StandardService.java:421)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
at org.apache.catalina.core.StandardServer.startInternal(StandardServer.java:932)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:183)
at org.apache.catalina.startup.Catalina.start(Catalina.java:633)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:344)
at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:475)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
由于我是Kafka的新手,我不知道如何正确处理这个问题,所以如果无法获取元数据,我的应用程序也会启动。
我考虑过为containerfactory设置errorhandler,但我不确定这是否能解决我的问题。
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(getRetryTemplate());
factory.setErrorHandler(new LoggingErrorHandler());
factory.setAutoStartup(false);
return factory;
}
@Bean
public RetryPolicy getRetryPolicy(){
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(getMaxRetryAttempts());
return simpleRetryPolicy;
}
@Bean
public FixedBackOffPolicy getBackOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(getRetryInterval());
return backOffPolicy;
}
@Bean
public RetryTemplate getRetryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getRetryPolicy());
retryTemplate.setBackOffPolicy(getBackOffPolicy());
return retryTemplate;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.consumer.bootstrap-servers"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, env.getProperty("spring.kafka.consumer.key-deserializer"));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, env.getProperty("spring.kafka.consumer.value-deserializer"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("spring.kafka.consumer.auto-offset-reset"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, env.getProperty("spring.kafka.consumer.client-id"));
props.put(SaslConfigs.SASL_MECHANISM, env.getProperty("kafka.sasl.mechanism"));
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty("kafka.security.protocol.config"));
props.put(SaslConfigs.SASL_JAAS_CONFIG, env.getProperty("kafka.sasl.jaas.configuration"));
try {
Resource res = new ClassPathResource(env.getProperty("spring.kafka.ssl.trust-store-location"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, res.getFile().getAbsolutePath());
} catch (IOException e) {
logger.error("SSL Truststore for Kafka connection could not be found!", e);
}
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("avro.kafka.consumer.deserializer.schema-url"));
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
env.getProperty("avro.kafka.consumer.deserializer.specific-avro-reader"));
return props;
}
private int getMaxRetryAttempts(){
return this.retryAttempts;
}
private int getRetryInterval(){
return this.retryInterval;
}
任何人都可以解释containerstoppingerrorhandler在这种情况下是如何工作的,以及它是否可以解决我的问题?我还考虑过自己创建kafkalistenerendpointregistrybean,但不知道如何配置,以便正确处理异常,并继续启动应用程序。在这种情况下,我应该考虑自动启动,如果是的话,控制和处理容器启动的正确方法是什么?
使用的版本:SpringKafka版本2.2.7.release SpringBootStarter父版本2.1.6.release
提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!