我查看了以下文档:https://github.com/akka/reactive-kafka 我看到了以下代码片段:
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "lowercaseStrings",
groupId = "groupName",
valueDeserializer = new StringDeserializer()
))
我知道“出版商”应该给Kafka写信。然而,消费者在Kafka的意思恰恰相反,这意味着消费者阅读Kafka的信息。如果是,那么“publisher”与kafka.consume(consumerproperties…)有什么关系?
1条答案
按热度按时间zujrkrfu1#
这是一个令人困惑的术语冲突案例。reactivekafka使用akka流,这是reactivestreams规范的一个实现。
在本规范中,
Publisher
发布到流并Subscriber
从流接收结果。如您所见,当您定义用于处理kafka消息的流时,kafka使用者充当Publisher
因为它是信息的来源(Source
在 akka 河的术语)。同样,Kafka的制作人也将是Subscriber
,因为它在一条小溪的尽头(Sink
在 akka 河)。所以在代码中定义
Publisher
到你的Kafka消费流。