consumer committablesource和plainsource之间有什么区别?

798qvoo8  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(439)

我正在尝试使用消费者图书馆https://doc.akka.io/docs/alpakka-kafka/current/consumer.html 方法 committableSource 具体如下:

  1. Consumer
  2. .committableSource(consumerSettings, Subscriptions.topics("SAP-EVENT-BUS"))
  3. .map(_.committableOffset)
  4. .toMat(Committer.sink(committerSettings))(Keep.both)
  5. .mapMaterializedValue(DrainingControl.apply)
  6. .run()

这里的问题是,如何获得消费者从中收到的消息 Kafka ?
与以下代码段一起工作:

  1. Consumer
  2. .plainSource(
  3. consumerSettings,
  4. Subscriptions.topics("SAP-EVENT-BUS"))
  5. .to(Sink.foreach(println))
  6. .run()

整个代码片段:

  1. private implicit val materializer = ActorMaterializer()
  2. private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
  3. private val consumerSettings =
  4. ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
  5. .withBootstrapServers("localhost:9092")
  6. .withGroupId("SAP-SENDER-GROUP")
  7. .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  8. private val committerSettings = CommitterSettings(context.system)
  9. Consumer
  10. .committableSource(consumerSettings, Subscriptions.topics("TOPIC"))
  11. .map(_.committableOffset)
  12. .toMat(Committer.sink(committerSettings))(Keep.both)
  13. .mapMaterializedValue(DrainingControl.apply)
  14. .run()
  15. Consumer
  16. .plainSource(
  17. consumerSettings,
  18. Subscriptions.topics("SAP-EVENT-BUS"))
  19. .to(Sink.foreach(println))
  20. .run()

或者我必须同时使用两者,一个用于提交,另一个用于消费。

stszievb

stszievb1#

而不是 Committer.sink ,终止流,使用 Committer.flow 它允许您继续流,直到您选择用另一个接收器终止它。

相关问题