我是新的Kafka,springboot和试图集成Kafka和ElasticSearch在我的springboot应用程序。
当我尝试运行springboot应用程序时,我看到以下错误:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.NoSuchMethodError:org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties;
我的pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.1.0</version>
</dependency>
Application.yml
security:
enabled: true
spring:
resources:
chain:
enabled: true
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
生成器类
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String topic = "users";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(History t){
logger.info("Inside send message to topic");
this.kafkaTemplate.send(topic,"HelloWorld");
}
}
Consumer.java
package com.springboot.kafka;
import com.springboot.model.History;
import com.springboot.repository.HistoryRepository;
import org.apache.kafka.common.protocol.types.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private static final String topic = "users";
@KafkaListener(topics = topic,groupId = "group-id")
public void consume (String t){
logger.info("Message read as " + t);
}
}
Application.properties:
logging.level.sql=info
logging.file = /var/tmp/SpringBootAppLog.log
spring.datasource.driver=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/test
spring.datasource.username=postgres
spring.datasource.password=postgres
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.properties.hibernate.show_sql=true
spring.jpa.hibernate.ddl-auto=update
spring.data.elasticsearch.cluster-name=my-application
spring.data.elasticsearch.cluster-nodes=localhost:9200
任何关于我错过了什么的想法。任何线索都将不胜感激。
2条答案
按热度按时间vohkndzv1#
你应该检查你的完整错误日志(stacktrace)。在我的例子中,问题是由下面的一个
java.io.FileNotFoundException
引起的。通常,您会在堆栈跟踪的末尾发现“真实的的”问题。
olqngx592#
你的
value-serializer
使用了一个StringDeserializer
。我假设这应该是StringSerializer
。因此,缺少一个值的序列化方法。