java—无限流量和批量写入数据库

wmtdaxz3  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(351)

在继续实际的事件处理之前,我有一个无限的事件流(来自kafka使用reactor kafka),我正试图将其批量写入数据库。我的问题是让这个工作与适当的背压。 windowTimeout 以及 bufferTimeout 似乎是很好的候选人,因为他们允许我指定两个最大大小,但也限制了等待时间,以防低“流量”。
首先是 windowTimeout ,从中对数据库进行批量写入。不过,这很快就出现了问题:reactor.core.exceptions$overflowexception:接收器的信号超出预期(有界队列…)。
然后我切换到 bufferTimeout ,但失败,出现错误reactor.core.exceptions$overflowexception:由于缺少请求,无法发出缓冲区。
我希望下面能说明我所追求的流程:

flux.groupBy(envelope -> envelope.partition)
  .flatMap(partitionFlux -> {
    final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
    final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
      .concatMap(batch -> {
        final ConsumedEnvelope last = batch.get(batch.size() - 1);

        return repository.persist(batch) // a)
          .then(last.acknowledge()) // b)
          .thenReturn(batch);
      });

    return processing(batchFlux);
  })
  .subscribe(result -> {
      // ...
  });

(一) repository.persist 只在内部迭代批处理以创建插入操作,然后返回 Mono<Void> .
b) acknowledge()用于kafka补偿,我只想在成功持久化批处理之后执行。都包在里面了 concatMap 对每个分区一次只处理一个批。
如上所述,这将导致溢出异常。有什么惯用的方法来达到我所描述的目的吗?在我看来,这应该不是一个太不寻常的任务,但我是新的React堆,并希望得到一些建议。
/d级
我意识到 onBackpressureBuffer 实际上,这对我来说没问题。但总的来说,有没有更好的方法?
编辑2…以上当然造成了问题,由于未绑定的需求,我不知怎的错过了。所以,回到最初的问题,或者也许是某种方式,让onbackpressurebuffer不请求一个未绑定的需求,而是只转发从下游请求的内容。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题