java项目React堆Kafka:在通量结束时执行动作而不阻塞

ax6ht2ek  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(332)

我正在使用 project-reactor KafkaAPI的React连接到 Kafka-brokers . 用例是有一个输入主题,其中包含要处理的文件的文件路径。应用程序读取每个文件,对其进行处理,创建经过处理的消息流,并将其推送到输出主题。要求是文件处理后必须删除,处理后的消息应推送到输出主题。因此,删除操作必须在处理完每个文件并将消息流推送到输出主题之后执行。

public Flux<?> flux() {
   return KafkaReceiver
   .create(receiverOptions(Collections.singleton(sourceTopic)))
   .receive()
   .flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x, 
   m.receiverOffset())))
   .as(sender::send)
   .doOnNext(m -> {
   m.correlationMetadata().acknowledge();
   deleteFile(path);
}).doOnCancel(() -> close());

}

  • transform()方法在文件路径(m.value())中启动文件处理并返回消息流。

问题是,即使在将所有消息推送到输出主题之前,文件也会被删除。因此,在失败的情况下,重新尝试时原始文件不可用。

c8ib6hqw

c8ib6hqw1#

因为看起来 path 变量可以在整个管道中访问(方法输入参数?),您可以在单独的 doFinally . 你需要过滤 onComplete 或者
cancel SignalType ,因为您不希望在出现故障时删除该文件。
另一个选择是 doOnComplete 如果您不想在取消时删除文件。

相关问题