当我们处理同步方法时,我们可以很容易地从Kafka接收一个消息列表:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
...
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)),
new StringDeserializer(),
new StringDeserializer());
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
@KafkaListener(topics = {"topic"}, containerFactory = "testContainerFactory", groupId = "group-id")
public void receiveMessages(List<String> messages) {
processorService.process(messages); // imagine that processorService works with lists more efficiently than with multiple calls by one message
}
如果我理解正确的话,可以保证列表不会超过500条消息。
我需要一个类似的行为与我的应用程序的React式解决方案。我知道我们可以使用reactor-kafka
:
@Bean
public ApplicationRunner runner() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
...
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(Collections.singletonList("topic"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(message -> processorService.process(List.of(message)))
.subscribe();
};
}
由于.receive()
返回一个Flux
,我需要将它 Package 到一个列表中,以向processorService.process
提供一个真实的列表(而不是只有一个值)。从Kafka效率的Angular 来看,将flux收集到这样的列表中可以吗?
KafkaReceiver.create(ro)
.receive()
.collectToList()
.doOnNext(processorService::process))
.subscribe();
在这种情况下,列表中会有多少项?如果我们在ReceiverOptions
中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG=500,它不会超过500,对吗?
1条答案
按热度按时间c9x0cxw01#
.receiveBatch()
最近添加;它在1.3.21中可用。https://github.com/reactor/reactor-kafka/commit/9597c9388f33b03817c439159a81807ab6300123