我有一个fs2流Stream[F, C
],其中C <: Coproduct
。我想把它转换成Stream[F, H]
,其中H <: HList
。这个HList应该包含协积C
的所有成员。
所以,本质上,是Pipe[F, C, H]
。
fs2管道的工作方式是等待至少拉取一个联产品的成员,然后一旦至少拉取一个联产品的成员,最后将其组合到一个HList中并输出。
因此,它的用法大致如下:
type MyCoprod = A :+: B :+: C :+: CNil
type MyHList = A :: B :: C :: HNil
val stream: Stream[F, MyHList] = Stream
.emits(List(A, B, C)) // my coproducts
.through(pullAll) // i want to wait for A, B, C to pulled at least once and outputted
.map { hlist => ... }
我是非常非常新的无形,这是我能想到的之前,击中一个路障:
trait WaitFor[F[_], C <: Coproduct] {
type Out <: HList
def apply: Pipe[F, C, Out]
}
object WaitFor {
type Aux[F[_], C <: Coproduct, Out0 <: HList] =
WaitFor[F, C] { type Out = Out0 }
implicit def make[F[_], C <: Coproduct, L <: HList](implicit
toHList: ToHList.Aux[C, L]
): Aux[F, C, L] = new WaitFor.Aux[F, C, L] {
override type Out = L
override def apply: Pipe[F, C, Out] = {
def go(s2: Stream[F, C], currHList: L): Pull[F, L, Unit] = {
s2.pull.uncons1.flatMap {
case Some((coproduct, s3)) => {
// add or update coproduct member to currHList
// if currHList is the same as L (our output type) then output it (Pull.output1(currHList)) and clear currHList
// if not, keep iterating:
go(s3, ???)
}
case None => Pull.done
}
}
go(s1, ???).stream
}
}
def pullAll[F[_], C <: Coproduct](
stream: Stream[F, C]
)(implicit ev: WaitFor[F, C]): Stream[F, ev.Out] = {
stream.through(ev.apply)
}
}
我的路障从这里开始:
override def apply: Pipe[F, C, Out] = ???
那时我对无形的知识就耗尽了。
我的想法是跟踪元组中的所有共积成员(Option[C1],Option[C2],...)。
一旦元组中的每个元素都是Some
,我就将它们转换为一个HList,并将它们输出到Stream中。
(我将使用FS2Pull递归地跟踪状态,所以我不担心这个问题)。
但我的问题是,在值级别,我无法知道元组的长度,也无法构造元组。
有什么建议让我解决这个问题吗?
谢谢
2条答案
按热度按时间oalqel3c1#
让我们一步一步来:
A :+: B :+: C :+: CNil
A
、最新B
等A :: B :: C :: HNil
HList
值时,您还应该重置中间值存储Option[A] :: Option[B] :: Option[C] :: HNil
是方便因此,让我们编写一个类型类来帮助我们:
这段代码没有假设我们如何存储缓存,也没有假设我们如何更新缓存,所以我们可以用一些不纯的代码来测试它:
我们可以确保它打印出我们所期望的内容。
现在,问题是我们如何将这个中间结果线程化通过FS2流。
您可以修改此代码以符合自己的审美观:将
updateCache
提取到某个函数中,使用状态单子或其他什么。我猜将其转换为管道将是,例如:x7rlezfr2#
只是为了补充@Mateusz Kubuszok惊人的回答,这就是我决定如何存储
Collector
缓存(fs 2 Pull方式):