Akka流是否像Kafka流一样实现了连接语义?

vyu0f0g1  于 2022-11-06  发布在  Kafka
关注(0)|答案(1)|浏览(268)

我对 akka 溪流很陌生,而我对Kafka溪流有一些经验。 akka 溪流似乎缺少一件事,那就是将两种不同的溪流结合在一起的可能性。
Kafka Streams允许使用消息的键连接来自两个不同流(或表)的信息。
《 akka 溪流》里也有类似的东西吗?

g6ll5ycj

g6ll5ycj1#

不幸的是,简短的回答是否定的。我认为Akka-streams比Kafka-Stream、Spark Streaming或Flink的级别更低。但是,您可以更好地控制自己正在做的事情。基本上,这意味着您可以构建自己的连接操作符。请在lightbend上查看此讨论。
基本上,您必须从2个Source获取数据,Merge它们并发送到基于时间或元组数量的窗口,计算连接,然后将数据发送到Sink。(这仍然是未完成的)但我遵循我在这里对你说的运算符,它正在编译和工作。基本上,我仍然需要加入窗口中的数据。目前,我只是在一个小批量中发出它们。

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{Attributes, ClosedShape, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic}

import scala.collection.mutable
import scala.concurrent.duration._

object StreamOpenGraphJoin {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("StreamOpenGraphJoin")
    val incrementSource: Source[Int, NotUsed] = Source(1 to 10).throttle(1, 1 second)
    val decrementSource: Source[Int, NotUsed] = Source(10 to 20).throttle(1, 1 second)

    def tokenizerSource(key: Int) = {
      Flow[Int].map { value =>
        (key, value)
      }
    }

    // Step 1 - setting up the fundamental for a stream graph
    val switchJoinStrategies = RunnableGraph.fromGraph(
      GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._

        // Step 2 - add partition and merge strategy
        val tokenizerShape00 = builder.add(tokenizerSource(0))
        val tokenizerShape01 = builder.add(tokenizerSource(1))

        val mergeTupleShape = builder.add(Merge[(Int, Int)](2))
        val batchFlow = Flow.fromGraph(new BatchTimerFlow[(Int, Int)](5 seconds))
        val sinkShape = builder.add(Sink.foreach[(Int, Int)](x => println(s" > sink: $x")))

        // Step 3 - tying up the components
        incrementSource ~> tokenizerShape00 ~> mergeTupleShape.in(0)
        decrementSource ~> tokenizerShape01 ~> mergeTupleShape.in(1)
        mergeTupleShape.out ~> batchFlow ~> sinkShape

        // Step 4 - return the shape
        ClosedShape
      }
    )
    // run the graph and materialize it
    val graph = switchJoinStrategies.run()
  }

  // step 0: define the shape
  class BatchTimerFlow[T](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[T, T]] {
    // step 1: define the ports and the component-specific members
    val in = Inlet[T]("BatchTimerFlow.in")
    val out = Outlet[T]("BatchTimerFlow.out")

    // step 3: create the logic
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
      // mutable state
      val batch = new mutable.Queue[T]
      var open = false

      // step 4: define mutable state implement my logic here
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          try {
            val nextElement = grab(in)
            batch.enqueue(nextElement)
            Thread.sleep(50) // simulate an expensive computation
            if (open) pull(in) // send demand upstream signal, asking for another element
            else {
              // forward the element to the downstream operator
              emitMultiple(out, batch.dequeueAll(_ => true).to[collection.immutable.Iterable])
              open = true
              scheduleOnce(None, silencePeriod)
            }
          } catch {
            case e: Throwable => failStage(e)
          }
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
      override protected def onTimer(timerKey: Any): Unit = {
        open = false
      }
    }
    // step 2: construct a new shape
    override def shape: FlowShape[T, T] = FlowShape[T, T](in, out)
  }
}

相关问题