我对 akka 溪流很陌生,而我对Kafka溪流有一些经验。 akka 溪流似乎缺少一件事,那就是将两种不同的溪流结合在一起的可能性。Kafka Streams允许使用消息的键连接来自两个不同流(或表)的信息。《 akka 溪流》里也有类似的东西吗?
g6ll5ycj1#
不幸的是,简短的回答是否定的。我认为Akka-streams比Kafka-Stream、Spark Streaming或Flink的级别更低。但是,您可以更好地控制自己正在做的事情。基本上,这意味着您可以构建自己的连接操作符。请在lightbend上查看此讨论。基本上,您必须从2个Source获取数据,Merge它们并发送到基于时间或元组数量的窗口,计算连接,然后将数据发送到Sink。(这仍然是未完成的)但我遵循我在这里对你说的运算符,它正在编译和工作。基本上,我仍然需要加入窗口中的数据。目前,我只是在一个小批量中发出它们。
Source
Merge
Sink
import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{Attributes, ClosedShape, FlowShape, Inlet, Outlet} import akka.stream.scaladsl.{Flow, GraphDSL, Merge, RunnableGraph, Sink, Source} import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic} import scala.collection.mutable import scala.concurrent.duration._ object StreamOpenGraphJoin { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("StreamOpenGraphJoin") val incrementSource: Source[Int, NotUsed] = Source(1 to 10).throttle(1, 1 second) val decrementSource: Source[Int, NotUsed] = Source(10 to 20).throttle(1, 1 second) def tokenizerSource(key: Int) = { Flow[Int].map { value => (key, value) } } // Step 1 - setting up the fundamental for a stream graph val switchJoinStrategies = RunnableGraph.fromGraph( GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ // Step 2 - add partition and merge strategy val tokenizerShape00 = builder.add(tokenizerSource(0)) val tokenizerShape01 = builder.add(tokenizerSource(1)) val mergeTupleShape = builder.add(Merge[(Int, Int)](2)) val batchFlow = Flow.fromGraph(new BatchTimerFlow[(Int, Int)](5 seconds)) val sinkShape = builder.add(Sink.foreach[(Int, Int)](x => println(s" > sink: $x"))) // Step 3 - tying up the components incrementSource ~> tokenizerShape00 ~> mergeTupleShape.in(0) decrementSource ~> tokenizerShape01 ~> mergeTupleShape.in(1) mergeTupleShape.out ~> batchFlow ~> sinkShape // Step 4 - return the shape ClosedShape } ) // run the graph and materialize it val graph = switchJoinStrategies.run() } // step 0: define the shape class BatchTimerFlow[T](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[T, T]] { // step 1: define the ports and the component-specific members val in = Inlet[T]("BatchTimerFlow.in") val out = Outlet[T]("BatchTimerFlow.out") // step 3: create the logic override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { // mutable state val batch = new mutable.Queue[T] var open = false // step 4: define mutable state implement my logic here setHandler(in, new InHandler { override def onPush(): Unit = { try { val nextElement = grab(in) batch.enqueue(nextElement) Thread.sleep(50) // simulate an expensive computation if (open) pull(in) // send demand upstream signal, asking for another element else { // forward the element to the downstream operator emitMultiple(out, batch.dequeueAll(_ => true).to[collection.immutable.Iterable]) open = true scheduleOnce(None, silencePeriod) } } catch { case e: Throwable => failStage(e) } } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) override protected def onTimer(timerKey: Any): Unit = { open = false } } // step 2: construct a new shape override def shape: FlowShape[T, T] = FlowShape[T, T](in, out) } }
1条答案
按热度按时间g6ll5ycj1#
不幸的是,简短的回答是否定的。我认为Akka-streams比Kafka-Stream、Spark Streaming或Flink的级别更低。但是,您可以更好地控制自己正在做的事情。基本上,这意味着您可以构建自己的连接操作符。请在lightbend上查看此讨论。
基本上,您必须从2个
Source
获取数据,Merge
它们并发送到基于时间或元组数量的窗口,计算连接,然后将数据发送到Sink
。(这仍然是未完成的)但我遵循我在这里对你说的运算符,它正在编译和工作。基本上,我仍然需要加入窗口中的数据。目前,我只是在一个小批量中发出它们。