如何获得kafka消费者(alpakka)返回的map[string,string]?

0ve6wy6x  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(442)

我应该从Kafka的消费者那里得到一张Map[string,string],但我真的不知道怎么做。我设法配置了消费者,它工作正常,但我不明白如何才能得到Map。

implicit val system: ActorSystem = ActorSystem()

    val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")

    val = kafkaConsumerSettings =
      ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(localhost:9094)
        .withGroupId(group1)
    Consumer
      .plainSource(kafkaConsumerSettings, Subscriptions.topics(entity.entity_name))
      .toMat(Sink.foreach(println))(DrainingControl.apply)
      .run()
lkaoscv7

lkaoscv71#

lightbend的建议是在反序列化来自kafka的传入数据时处理字节数组
消息反序列化的一般建议是使用字节数组(或字符串)作为值,并在akka流的map操作中执行反序列化,而不是直接在kafka反序列化器中实现。当反序列化在akka流中显式处理时,更容易实现所需的错误处理策略,如下例所示。
为此,您可以使用以下设置设置使用者:

val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new ByteArrayDeserializer)

并通过调用 .value() 方法。要反序列化它,我建议使用circe+jawn。这段代码应该能做到。

import io.circe.jawn
import io.circe.generic.auto._

val bytes = record.value()

val data = jawn.parseByteBuffer(ByteBuffer.wrap(bytes)).flatMap(_.as[Map[String, String]])

相关问题