scala 将联产品流转换为HList流-无形状和FS 2

gcmastyq  于 2022-11-23  发布在  Scala
关注(0)|答案(2)|浏览(135)

我有一个fs2流Stream[F, C ],其中C <: Coproduct。我想把它转换成Stream[F, H],其中H <: HList。这个HList应该包含协积C的所有成员。
所以,本质上,是Pipe[F, C, H]
fs2管道的工作方式是等待至少拉取一个联产品的成员,然后一旦至少拉取一个联产品的成员,最后将其组合到一个HList中并输出。
因此,它的用法大致如下:

type MyCoprod = A :+: B :+: C :+: CNil
type MyHList = A :: B :: C :: HNil

val stream: Stream[F, MyHList] = Stream
  .emits(List(A, B, C)) // my coproducts
  .through(pullAll) // i want to wait for A, B, C to pulled at least once and outputted 
  .map { hlist => ... }

我是非常非常新的无形,这是我能想到的之前,击中一个路障:

trait WaitFor[F[_], C <: Coproduct] {
  type Out <: HList

  def apply: Pipe[F, C, Out]
}

object WaitFor {
  type Aux[F[_], C <: Coproduct, Out0 <: HList] =
    WaitFor[F, C] { type Out = Out0 }

  implicit def make[F[_], C <: Coproduct, L <: HList](implicit
    toHList: ToHList.Aux[C, L]
  ): Aux[F, C, L] = new WaitFor.Aux[F, C, L] {
    override type Out = L

    override def apply: Pipe[F, C, Out] = {
      def go(s2: Stream[F, C], currHList: L): Pull[F, L, Unit] = {
        s2.pull.uncons1.flatMap {
          case Some((coproduct, s3)) => {
            // add or update coproduct member to currHList

            // if currHList is the same as L (our output type) then output it (Pull.output1(currHList)) and clear currHList

            // if not, keep iterating:

            go(s3, ???)
          }

          case None => Pull.done
        }
      }
      go(s1, ???).stream
    }
  }

  def pullAll[F[_], C <: Coproduct](
    stream: Stream[F, C]
  )(implicit ev: WaitFor[F, C]): Stream[F, ev.Out] = {
    stream.through(ev.apply)
  }
}

我的路障从这里开始:

override def apply: Pipe[F, C, Out] = ???

那时我对无形的知识就耗尽了。
我的想法是跟踪元组中的所有共积成员(Option[C1],Option[C2],...)。
一旦元组中的每个元素都是Some,我就将它们转换为一个HList,并将它们输出到Stream中。
(我将使用FS2Pull递归地跟踪状态,所以我不担心这个问题)。
但我的问题是,在值级别,我无法知道元组的长度,也无法构造元组。
有什么建议让我解决这个问题吗?
谢谢

oalqel3c

oalqel3c1#

让我们一步一步来:

  • 您的输入将是A :+: B :+: C :+: CNil
  • 您将存储以下内容:最新A、最新B
  • 最初不会有任何最新值
  • 找到所有值后,应发出A :: B :: C :: HNil
  • 当您发出新的HList值时,您还应该重置中间值存储
  • 这表明将这些中间值存储为Option[A] :: Option[B] :: Option[C] :: HNil是方便

因此,让我们编写一个类型类来帮助我们:

import shapeless._

// A type class for collecting Coproduct elements (last-wins)
// until they could be combined into an HList element

// Path-dependent types and Aux for better DX, e.g. when one
// would want Collector[MyType] without manually entering HLists
trait Collector[Input] {

  type Cache
  type Result

  // pure computation of an updated cache
  def updateState(newInput: Input, currentState: Cache): Cache

  // returns Some if all elements of Cache are Some, None otherwise
  def attemptConverting(updatedState: Cache): Option[Result]

  // HLists of Nones
  def emptyCache: Cache
}
object Collector {

  type Aux[Input, Cache0, Result0] = Collector[Input] {
    type Cache = Cache0
    type Result = Result0
  }

  def apply[Input](implicit
      collector: Collector[Input]
  ): Collector.Aux[Input, collector.Cache, collector.Result] =
    collector

  // obligatory empty Coproduct/HList case to terminate recursion
  implicit val nilCollector: Collector.Aux[CNil, HNil, HNil] =
    new Collector[CNil] {

      type Cache = HNil
      type Result = HNil

      override def updateState(newInput: CNil, currentState: HNil): HNil = HNil

      override def attemptConverting(updatedState: HNil): (Option[HNil]) =
        Some(HNil)

      override def emptyCache: HNil = HNil
    }

  // here we define the actual recursive derivation
  implicit def consCollector[
      Head,
      InputTail <: Coproduct,
      CacheTail <: HList,
      ResultTail <: HList
  ](implicit
      tailCollector: Collector.Aux[InputTail, CacheTail, ResultTail]
  ): Collector.Aux[
      Head :+: InputTail,
      Option[Head] :: CacheTail,
      Head :: ResultTail
  ] = new Collector[Head :+: InputTail] {

      type Cache = Option[Head] :: CacheTail
      type Result = Head :: ResultTail

      override def updateState(
          newInput: Head :+: InputTail,
          currentState: Option[Head] :: CacheTail
      ): Option[Head] :: CacheTail = newInput match {
        case Inl(head) => Some(head) :: currentState.tail
        case Inr(tail) =>
          currentState.head :: tailCollector.updateState(
            tail,
            currentState.tail
          )
      }

      override def attemptConverting(
          updatedState: Option[Head] :: CacheTail
      ): Option[Head :: ResultTail] = for {
        head <- updatedState.head
        tail <- tailCollector.attemptConverting(updatedState.tail)
      } yield head :: tail

      override def emptyCache: Option[Head] :: CacheTail =
        None :: tailCollector.emptyCache
    }
}

这段代码没有假设我们如何存储缓存,也没有假设我们如何更新缓存,所以我们可以用一些不纯的代码来测试它:

import shapeless.ops.coproduct.Inject

type Input = String :+: Int :+: Double :+: CNil
val collector = Collector[Input]

// dirty, but good enough for demo
var cache = collector.emptyCache

LazyList[Input](
  Inject[Input, String].apply("test1"),
  Inject[Input, String].apply("test2"),
  Inject[Input, String].apply("test3"),
  Inject[Input, Int].apply(1),
  Inject[Input, Int].apply(2),
  Inject[Input, Int].apply(3),
  Inject[Input, Double].apply(3),
  Inject[Input, Double].apply(4),
  Inject[Input, Double].apply(3),
  Inject[Input, String].apply("test4"),
  Inject[Input, Int].apply(4),
).foreach { input =>
  val newCache = collector.updateState(input, cache)
  collector.attemptConverting(newCache) match {
    case Some(value) =>
      println(s"Product computed: value!")
      cache = collector.emptyCache
    case None =>
      cache = newCache
  }
  println(s"Current cache: $cache")
}

我们可以确保它打印出我们所期望的内容。

Current cache: Some(test1) :: None :: None :: HNil
Current cache: Some(test2) :: None :: None :: HNil
Current cache: Some(test3) :: None :: None :: HNil
Current cache: Some(test3) :: Some(1) :: None :: HNil
Current cache: Some(test3) :: Some(2) :: None :: HNil
Current cache: Some(test3) :: Some(3) :: None :: HNil
Product computed: test3 :: 3 :: 3.0 :: HNil!
Current cache: None :: None :: None :: HNil
Current cache: None :: None :: Some(4.0) :: HNil
Current cache: None :: None :: Some(3.0) :: HNil
Current cache: Some(test4) :: None :: Some(3.0) :: HNil
Product computed: test4 :: 4 :: 3.0 :: HNil!
Current cache: None :: None :: None :: HNil

现在,问题是我们如何将这个中间结果线程化通过FS2流。

for {
  // for easy passing of cache around
  cacheRef <- Stream.eval(Ref[IO].of(collector.emptyCache))
  // source of Coproducts
  input <- Stream[IO, Input](
    Inject[Input, String].apply("test1"),
    Inject[Input, String].apply("test2"),
    Inject[Input, String].apply("test3"),
    Inject[Input, Int].apply(1),
    Inject[Input, Int].apply(2),
    Inject[Input, Int].apply(3),
    Inject[Input, Double].apply(3)
  )
  updateCache = cacheRef.modify[Stream[IO, collector.Result]] { cache =>
    val newCache = collector.updateState(input, cache)
    collector.attemptConverting(newCache) match {
      case Some(value) => collector.emptyCache -> Stream(value)
      case None        => newCache -> Stream.empty
    }
  }
  // emits new HList only if all of its elements has been gathered 
  hlist <- Stream.eval(updateCache).flatten
} yield hlist

您可以修改此代码以符合自己的审美观:将updateCache提取到某个函数中,使用状态单子或其他什么。我猜将其转换为管道将是,例如:

// you might replace cats.effect.IO with F[_]: Monad, use something
// else instead of Ref, or whatever
def collectCoproductsToHList[Input](
    implicit collector: Collector[Input]
): IO[Pipe[IO, Input, collector.Result]] = 
  Ref[IO].of(collector.emptyCache).map { cacheRef =>
      
    val pipe: Pipe[IO, Input, collector.Result] = inputStream => for {
      input <- inputStream
      updateCache = cacheRef.modify[Stream[IO, collector.Result]] { cache =>
        val newCache = collector.updateState(input, cache)
        collector.attemptConverting(newCache) match {
          case Some(value) => collector.emptyCache -> Stream(value)
          case None        => newCache             -> Stream.empty
        }
      }
      hlist <- Stream.eval(updateCache).flatten
    } yield hlist
      
    pipe
  }
x7rlezfr

x7rlezfr2#

只是为了补充@Mateusz Kubuszok惊人的回答,这就是我决定如何存储Collector缓存(fs 2 Pull方式):

trait CollectorPipe[F[_], C <: Coproduct] {
  type Out <: HList

  def pipe: Pipe[F, C, Out]
}

object CollectorPipe {
  type Aux[F[_], C <: Coproduct, Out0 <: HList] =
    CollectorPipe[F, C] { type Out = Out0 }

  def instance[F[_], C <: Coproduct, Out0 <: HList](tubo: Pipe[F, C, Out0]): Aux[F, C, Out0] =
    new CollectorPipe[F, C] {
      override type Out = Out0
      override def pipe: Pipe[F, C, Out0] = tubo
    }

  implicit def make[
    F[_],
    C <: Coproduct
  ](implicit
    collector: Collector[C]
  ): Aux[F, C, collector.Result] = instance { s1 =>
    def go(s2: Stream[F, C], curr: collector.Cache): Pull[F, collector.Result, Unit] = {
      s2.pull.uncons1.flatMap {
        case Some((c, s3)) => {
          val newState = collector.updateState(c, curr)

          collector.attemptConverting(newState) match {
            case Some(value) => Pull.output1(value) >> go(s3, collector.emptyCache)
            case None        => go(s3, newState)
          }
        }
        case None          => Pull.done
      }
    }

    go(s1, collector.emptyCache).stream
  }

  implicit class CollectorPipeStreamOps[F[_], A <: Coproduct](private val s: Stream[F, A]) {
    def pullAll(implicit ev: CollectorPipe[F, A]): Stream[F, ev.Out] = s.through(ev.pipe)
  }
}

相关问题