我正在从Kafka获取数据,然后反序列化 Array[Byte] 使用默认解码器,然后我的rdd元素 (null,[B@406fa9b2) , (null,[B@21a9fe0) 但是我想要我的原始数据有一个模式,那么我怎样才能做到这一点呢?我以avro格式序列化消息。
Array[Byte]
(null,[B@406fa9b2)
(null,[B@21a9fe0)
eqoofvh91#
必须使用适当的反序列化程序(比如字符串或自定义对象)对字节进行解码。如果你不做解码你会得到 [B@406fa9b2 这只是java中字节数组的文本表示。kafka对消息的内容一无所知,因此它将字节数组从生产者传递给消费者。在spark流媒体中,必须对键和值使用序列化程序(引用kafkawordcount示例):
[B@406fa9b2
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
使用上面的序列化程序 DStream[String] 所以你和 RDD[String] .但是,如果您想直接将字节数组反序列化到您的自定义类中,就必须编写一个自定义序列化程序(这是kafka特有的,与spark无关)。我建议将json与固定模式或avro结合使用(使用kafka、spark和avro第3部分“生成和使用avro消息”中描述的解决方案)。然而,在结构化流媒体中,管道可以如下所示:
DStream[String]
RDD[String]
val fromKafka = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load. select('value cast "string") // <-- conversion here
1条答案
按热度按时间eqoofvh91#
必须使用适当的反序列化程序(比如字符串或自定义对象)对字节进行解码。
如果你不做解码你会得到
[B@406fa9b2
这只是java中字节数组的文本表示。kafka对消息的内容一无所知,因此它将字节数组从生产者传递给消费者。
在spark流媒体中,必须对键和值使用序列化程序(引用kafkawordcount示例):
使用上面的序列化程序
DStream[String]
所以你和RDD[String]
.但是,如果您想直接将字节数组反序列化到您的自定义类中,就必须编写一个自定义序列化程序(这是kafka特有的,与spark无关)。
我建议将json与固定模式或avro结合使用(使用kafka、spark和avro第3部分“生成和使用avro消息”中描述的解决方案)。
然而,在结构化流媒体中,管道可以如下所示: