我正在尝试使用@enablekafka和@kafkalistener注解为React式kafka消费者编写spring引导应用程序。我已经在不同的机器上配置了我的Kafka代理。当我将引导服务器提供给kafka代理的播发主机时,它总是将播发主机的ip地址覆盖到localhost。下面是我的代码。
pom.xml文件file:-
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
config:-
@Configuration
@EnableKafka
public class AppConfig {
@Bean
Map consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.12.12.24:9092,192.14.14.28:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
ReceiverOptions receiverOptions() {
ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps()).subscription(Arrays.asList("hellochange"));
return receiverOptions;
}
@Bean
public KafkaReceiver kafkaReceiver() {
return KafkaReceiver.create(receiverOptions());
}
}
consumer:-
@Service
public class ChangeListener {
@Autowired
KafkaReceiver kafkaReceiver;
@KafkaListener(topics="hellochange",groupId="example-group")
public void receiver() {
kafkaReceiver.receive().subscribe(System.out::println);
}
}
console:-
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
2018-06-07 19:59:17.640 WARN 23536 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=example-group] Connection to node -1 could not be established. Broker may not be available.
我已经在一个简单的消费者和非React性SpringKafka中验证了没有Spring配置的情况,对于这两种情况,它都工作得很好。只有React堆Kafka与启用Kafka和Kafka利塞纳注解我得到这个问题。
我是不是错过了什么/做错了什么?我们可以在springboot中对reactor kafka使用enablekafka和kafkalistener注解吗?
p、 我明白了, @EnableKafka
以及 @KafkaListener
如果我移除 spring-kafka
在pom.xml中,两个注解都不可用。
就像 @EnableKafka
以及 @KafkaListener
对于非React性kafka,是否有任何注解可用于配置spring引导应用程序的React性kafka消费者?
1条答案
按热度按时间pieyvz9o1#
不能将kafkalistener注解用于reactor kafka;
@KafkaListener
不是被动的。