我们使用的是spring-kafka-2.0.1.release,在生产中出现以下错误:
异常java.lang.nullpointerexception java.lang.nullpointerexception:org.springframework.kafka.listener.kafkamessagelistenercontainer$listenercummer.access$000(kafkamessagelistenercontainer。java:271)~[spring-kafka-2.0.1.发布。jar:2.0.1.release]在org.springframework.kafka.listener.kafkamessagelistenercontainer.getassignedpartitions(kafkamessagelistenercontainer)。java:152)~[spring-kafka-2.0.1.发布。jar:2.0.1.release]在
我们使用以下道具:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EspDLPResponseDeserializer.class);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectMs); // when esp server unreachable, default value too small
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff); // default value too small
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRec);
if (saslEnabled) {
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// jaas.conf with -D server start parameter
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaas_config);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ssl_keystore);
}
你能检查一下吗?
非常感谢你
1条答案
按热度按时间t30tvxxf1#
这绝对是个错误。我们确实没有检查
this.listenerConsumer != null
. 此属性是从doStart()
,但不能保证getAssignedPartitions()
在那之前不会打电话的。例如,我在toString()
:请就此事提出一个问题:https://github.com/spring-projects/spring-kafka/issues
作为解决方法,不要创建bean。