我想知道为什么下面的代码会打印一个
val consumerProperties = ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "test",
groupId = "group",
valueDeserializer = new StringDeserializer
).commitInterval(1200 milliseconds)
consumerWithOffSetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffSetSink.publisher)
.map{ msg =>
println(msg.value())
msg
}
.to(consumerWithOffSetSink.offsetCommitSink).run()
但是,下面的代码并没有达到我所期望的效果:
val consumerProperties = ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "test",
groupId = "group",
valueDeserializer = new StringDeserializer
).commitInterval(1200 milliseconds)
consumerWithOffSetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffSetSink.publisher)
.runForeach(msg => println(msg.value()))
.foreach(msg => println(msg))
我想了解的是如何构建我的流媒体应用程序,我还在理解过程中 Akka Streams
总体而言,以及它们如何在内部工作。
暂无答案!
目前还没有任何答案,快来回答吧!