我应该从Kafka的消费者那里得到一张Map[string,string],但我真的不知道怎么做。我设法配置了消费者,它工作正常,但我不明白如何才能得到Map。
implicit val system: ActorSystem = ActorSystem()
val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val = kafkaConsumerSettings =
ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(localhost:9094)
.withGroupId(group1)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(entity.entity_name))
.toMat(Sink.foreach(println))(DrainingControl.apply)
.run()
1条答案
按热度按时间lkaoscv71#
lightbend的建议是在反序列化来自kafka的传入数据时处理字节数组
消息反序列化的一般建议是使用字节数组(或字符串)作为值,并在akka流的map操作中执行反序列化,而不是直接在kafka反序列化器中实现。当反序列化在akka流中显式处理时,更容易实现所需的错误处理策略,如下例所示。
为此,您可以使用以下设置设置使用者:
并通过调用
.value()
方法。要反序列化它,我建议使用circe+jawn。这段代码应该能做到。