reactor kafka异步保证有序消息消耗失败

kmynzznz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(367)

reactor kafka文档概述了按顺序使用kafka分区的消息的示例代码,但是在示例代码中,处理方法是同步的。根据一些局部测试,顺序处理和背压在这个特殊的样品中工作得非常好

public Flux<?> flux() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60, Integer.MAX_VALUE, "sample", 60, true);
            return KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
                            .receive()
                            .groupBy(m -> m.receiverOffset().topicPartition())
                            .flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
                                                                   .map(r -> processRecord(partitionFlux.key(), r))
                                                                   .sample(Duration.ofMillis(5000))
                                                                   .concatMap(offset -> offset.commit()))
                            .doOnCancel(() -> close());
        }
        public ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, Person> message) {
            log.info("Processing record {} from partition {} in thread{}",
                    message.value().id(), topicPartition, Thread.currentThread().getName());
            return message.receiverOffset();
        }

我们的用例将处理逻辑作为一个异步函数,例如下面所示的例子,所以我们的例子中的processrecord返回一个mono。processrecord方法大约需要3-4秒才能完成,在这种情况下,流不接受背压。越来越多的消息被拉取,而不使用先前的消息。这会导致系统逐渐变得不稳定,最后出现outofmemory异常而失败。订货是尊重,但背压不是。

public Mono<ReceiverOffset> processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, Person> message) {
            log.info("Processing record {} from partition {} in thread{}",
                    message.value().id(), topicPartition, Thread.currentThread().getName());
....
        }

我们是否有一个使用reactor从kafka异步消费消息的示例

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题