我想从流中引用物化值。下面是代码片段,但它没有编译,错误:
type mismatch;
found : (akka.NotUsed, scala.concurrent.Future[akka.Done])
required: (Playground.DomainObj, scala.concurrent.Future[akka.Done])
编码:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.Future
import akka.NotUsed
import akka.Done
implicit val actorSystem = ActorSystem("example")
case class DomainObj(name: String, age: Int)
val customFlow1:Flow[String,DomainObj,NotUsed] = Flow[String].map(s => {
DomainObj(s, 50)
})
val customFlow2 = Flow[DomainObj].map(s => {
s.age + 10
})
val printAnySink: Sink[Any, Future[Done]] = Sink.foreach(println)
val c1 = Source.single("John").viaMat(customFlow1)(Keep.right).viaMat(customFlow2)(Keep.left).toMat(printAnySink)(Keep.both)
val res: (DomainObj, Future[Done]) = c1.run()
在操场上找到代码:https://scastie.scala-lang.org/P9iSx49cQcaOZfKtVCzTPA
我希望在流完成后引用DomainObj/
1条答案
按热度按时间jutyujz01#
一个
Flow[String, DomainObj, NotUsed]
的物化值是NotUsed
,而不是一个DomainObj
,因此c1
的物化值是(NotUsed, Future[Done])
。看起来这里的目的是捕获在
customFlow1
中创建的DomainObj
。请注意,
Sink.head
实际上要求customFlow1
只能在只发射一次的某个对象的下游使用。