在继续实际的事件处理之前,我有一个无限的事件流(来自kafka使用reactor kafka),我正试图将其批量写入数据库。我的问题是让这个工作与适当的背压。 windowTimeout
以及 bufferTimeout
似乎是很好的候选人,因为他们允许我指定两个最大大小,但也限制了等待时间,以防低“流量”。
首先是 windowTimeout
,从中对数据库进行批量写入。不过,这很快就出现了问题:reactor.core.exceptions$overflowexception:接收器的信号超出预期(有界队列…)。
然后我切换到 bufferTimeout
,但失败,出现错误reactor.core.exceptions$overflowexception:由于缺少请求,无法发出缓冲区。
我希望下面能说明我所追求的流程:
flux.groupBy(envelope -> envelope.partition)
.flatMap(partitionFlux -> {
final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
.concatMap(batch -> {
final ConsumedEnvelope last = batch.get(batch.size() - 1);
return repository.persist(batch) // a)
.then(last.acknowledge()) // b)
.thenReturn(batch);
});
return processing(batchFlux);
})
.subscribe(result -> {
// ...
});
(一) repository.persist
只在内部迭代批处理以创建插入操作,然后返回 Mono<Void>
.
b) acknowledge()用于kafka补偿,我只想在成功持久化批处理之后执行。都包在里面了 concatMap
对每个分区一次只处理一个批。
如上所述,这将导致溢出异常。有什么惯用的方法来达到我所描述的目的吗?在我看来,这应该不是一个太不寻常的任务,但我是新的React堆,并希望得到一些建议。
/d级
我意识到 onBackpressureBuffer
实际上,这对我来说没问题。但总的来说,有没有更好的方法?
编辑2…以上当然造成了问题,由于未绑定的需求,我不知怎的错过了。所以,回到最初的问题,或者也许是某种方式,让onbackpressurebuffer不请求一个未绑定的需求,而是只转发从下游请求的内容。
暂无答案!
目前还没有任何答案,快来回答吧!