如何将kafka中的字节转换为原始对象?

2hh7jdfx  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(667)

我正在从Kafka获取数据,然后反序列化 Array[Byte] 使用默认解码器,然后我的rdd元素 (null,[B@406fa9b2) , (null,[B@21a9fe0) 但是我想要我的原始数据有一个模式,那么我怎样才能做到这一点呢?
我以avro格式序列化消息。

eqoofvh9

eqoofvh91#

必须使用适当的反序列化程序(比如字符串或自定义对象)对字节进行解码。
如果你不做解码你会得到 [B@406fa9b2 这只是java中字节数组的文本表示。
kafka对消息的内容一无所知,因此它将字节数组从生产者传递给消费者。
在spark流媒体中,必须对键和值使用序列化程序(引用kafkawordcount示例):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

使用上面的序列化程序 DStream[String] 所以你和 RDD[String] .
但是,如果您想直接将字节数组反序列化到您的自定义类中,就必须编写一个自定义序列化程序(这是kafka特有的,与spark无关)。
我建议将json与固定模式或avro结合使用(使用kafka、spark和avro第3部分“生成和使用avro消息”中描述的解决方案)。
然而,在结构化流媒体中,管道可以如下所示:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here

相关问题