java 如何使用reactor-kafka接收器获取消息列表?

mw3dktmi  于 2023-09-29  发布在  Java
关注(0)|答案(1)|浏览(117)

当我们处理同步方法时,我们可以很容易地从Kafka接收一个消息列表:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(Map.of(
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
    ...
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)),
            new StringDeserializer(),
            new StringDeserializer());
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    return factory;
}

@KafkaListener(topics = {"topic"}, containerFactory = "testContainerFactory", groupId = "group-id")
public void receiveMessages(List<String> messages) {
    processorService.process(messages); // imagine that processorService  works with lists more efficiently than with multiple calls by one message
}

如果我理解正确的话,可以保证列表不会超过500条消息。
我需要一个类似的行为与我的应用程序的React式解决方案。我知道我们可以使用reactor-kafka

@Bean
public ApplicationRunner runner() {
    return args -> {
        ReceiverOptions<String, String> ro = ReceiverOptions.<String, String>create(
                Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                       ...
                       ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500))
        .withKeyDeserializer(new StringDeserializer())
        .withValueDeserializer(new StringDeserializer())
        .subscription(Collections.singletonList("topic"));
        KafkaReceiver.create(ro)
                .receive()
                .doOnNext(message -> processorService.process(List.of(message)))
                .subscribe();
    };
}

由于.receive()返回一个Flux,我需要将它 Package 到一个列表中,以向processorService.process提供一个真实的列表(而不是只有一个值)。从Kafka效率的Angular 来看,将flux收集到这样的列表中可以吗?

KafkaReceiver.create(ro)
                .receive()
                .collectToList()
                .doOnNext(processorService::process))
                .subscribe();

在这种情况下,列表中会有多少项?如果我们在ReceiverOptions中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG=500,它不会超过500,对吗?

相关问题