阿克卡流Kafka,完成流时,到达日志结束

zengzsys  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(292)

我正在使用akka-streams-kafka,我正在寻找一种方法来执行以下操作:
来自偏移量的起始流 x 按顺序消费项目 x , x+1 , x+2 .. 直到最后一项
消费完最后一项后,完成流。
代码看起来像

Consumer
  .plainSource(consumerSettings, subscription)
  .runForeach(println("got record!"))
  .onComplete {
    case Success(_) => // all items read
    case Failure(error) => // error
  }

它将在最后一个元素被读取后完成。也许这不是这个库的用途。我怎样才能做到这一点?

1cosmwyk

1cosmwyk1#

akka消费者以一种“拉动式”的方式工作,除非发生与经纪人有关的错误,否则它将永远活着。但是,你认为这条河什么时候结束?。kafka可以被看作是一个分布式日志,从中可以读取给定偏移量的消息。只要你的客户是连接到经纪人你的客户将启动和运行。。。例如,如果在某个时间间隔内没有来自kafka的事件时考虑终止流,则可以使用idletimeout:

Consumer
    .plainSource(consumerSettings, subscription)
    .idleTimeout(10 seconds)
    .runForeach(e => println("E"))
    .onComplete {
      case Success(_) => // all items read
      case Failure(error) =>
      // TimeoutException if no element in ten seconds the stream stops throwing this exception
    }

另一种可能是使用扇入式舞台,尤其是在舞台上。我们可以创建另一个在一个时间间隔内发射事件的滴答声源。Kafka的来源将有优先权,所以只要元素来自Kafka的舞台总是会从这个来源拉元素。如果某个时间间隔内没有元素,则会将“timeout”字符串推送到下游。比如:

implicit val actorSystem = ActorSystem("test-actor-system")
  implicit val streamMaterializer = ActorMaterializer()
  implicit val ec = actorSystem.dispatcher

  val consumer =
  Consumer
    .plainSource(consumerSettings, subscription)
    .map(_.value())

  val tick = Source.tick(50 millis, 30 seconds, "Timeout")

  val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
    (r1, r2) ⇒
      val merge = b.add(MergePreferred[String](1, false))
      r2 ~> merge.in(0)
      r1 ~> merge.preferred
      SourceShape(merge.out)
  }

  Source
    .fromGraph(source)
    .takeWhile(el => el != "Timeout")
    .runForeach(msg => println(msg))
  .onComplete{
    case Success(_) => println("Stream ended")
    case Failure(error) => println("There was an error")
  }

与takewhile流将是活跃的,同时有来自Kafka的元素。
这只是一种方法。akka流有许多不同的阶段,graph api可能以一种更优雅的方式来面对这些情况。

相关问题