React性Kafka不打印消息

xt0899hw  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(257)

我想知道为什么下面的代码会打印一个

val consumerProperties = ConsumerProperties(
  bootstrapServers = "localhost:9092",
  topic = "test",
  groupId = "group",
  valueDeserializer = new StringDeserializer
).commitInterval(1200 milliseconds)

consumerWithOffSetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffSetSink.publisher)
    .map{ msg =>
      println(msg.value())
      msg
    }
  .to(consumerWithOffSetSink.offsetCommitSink).run()

但是,下面的代码并没有达到我所期望的效果:

val consumerProperties = ConsumerProperties(
  bootstrapServers = "localhost:9092",
  topic = "test",
  groupId = "group",
  valueDeserializer = new StringDeserializer
).commitInterval(1200 milliseconds)

consumerWithOffSetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffSetSink.publisher)
  .runForeach(msg => println(msg.value()))
  .foreach(msg => println(msg))

我想了解的是如何构建我的流媒体应用程序,我还在理解过程中 Akka Streams 总体而言,以及它们如何在内部工作。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题