我正在使用 project-reactor
KafkaAPI的React连接到 Kafka-brokers
. 用例是有一个输入主题,其中包含要处理的文件的文件路径。应用程序读取每个文件,对其进行处理,创建经过处理的消息流,并将其推送到输出主题。要求是文件处理后必须删除,处理后的消息应推送到输出主题。因此,删除操作必须在处理完每个文件并将消息流推送到输出主题之后执行。
public Flux<?> flux() {
return KafkaReceiver
.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x,
m.receiverOffset())))
.as(sender::send)
.doOnNext(m -> {
m.correlationMetadata().acknowledge();
deleteFile(path);
}).doOnCancel(() -> close());
}
- transform()方法在文件路径(m.value())中启动文件处理并返回消息流。
问题是,即使在将所有消息推送到输出主题之前,文件也会被删除。因此,在失败的情况下,重新尝试时原始文件不可用。
1条答案
按热度按时间c8ib6hqw1#
因为看起来
path
变量可以在整个管道中访问(方法输入参数?),您可以在单独的doFinally
. 你需要过滤onComplete
或者cancel
SignalType
,因为您不希望在出现故障时删除该文件。另一个选择是
doOnComplete
如果您不想在取消时删除文件。