如何处理akka流中的流故障?

rn0zuynd  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(299)

我正在运行一个从kafka读取的akka流,我想在文件序列化成功时将消息提交回kafka。但我不知道如何通知上游阶段下游的故障。
现在,我已经用 FanOutShape2[ConsumerMessage.CommittableMessage[Array[Byte], Array[Byte]], ConsumerRecord[Array[Byte], Array[Byte]], ConsumerMessage.CommittableOffsetBatch] .
我想推到出口时,下游Flume完成消耗所有的时间 ConsumerRecords ,但我想在文件无法正确完成时保留这些提交。
在这个简化的场景中,假设我有下面的流

Source(List("one", "two"))
    .map(ByteString(_))
    .runWith(FileIO.toPath(Paths.get("/file-in-root-will-fail.txt")))

那将以一个 IOResult(0,Failure(java.nio.file.AccessDeniedException: /file-in-root-will-fail.txt)) .
我如何通知上游阶段发生了这种情况?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题