我正在运行一个从kafka读取的akka流,我想在文件序列化成功时将消息提交回kafka。但我不知道如何通知上游阶段下游的故障。
现在,我已经用 FanOutShape2[ConsumerMessage.CommittableMessage[Array[Byte], Array[Byte]], ConsumerRecord[Array[Byte], Array[Byte]], ConsumerMessage.CommittableOffsetBatch]
.
我想推到出口时,下游Flume完成消耗所有的时间 ConsumerRecords
,但我想在文件无法正确完成时保留这些提交。
在这个简化的场景中,假设我有下面的流
Source(List("one", "two"))
.map(ByteString(_))
.runWith(FileIO.toPath(Paths.get("/file-in-root-will-fail.txt")))
那将以一个 IOResult(0,Failure(java.nio.file.AccessDeniedException: /file-in-root-will-fail.txt))
.
我如何通知上游阶段发生了这种情况?
暂无答案!
目前还没有任何答案,快来回答吧!