在带有kafka和schema registry的spark流中,在接收到dstream之后,如何将dstream批处理转换为spark中的dataframe?
从confluent使用kafkaavrodecoder后的数据流类型是dstream(string,object)。当我使用下面的代码时,它会将avro列中的模式数据类型(如int)更改为long。
val kafkaStream: DStream[(String, Object)] =
KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
ssc, kafkaParams, Set(topic)
)
// Load JSON strings into DataFrame
kafkaStream.foreachRDD { rdd =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val topicValueStrings = rdd.map(_._2.toString)
val df = sqlContext.read.json(topicValueStrings)
代码参考
object.tosting和读取为json释放int的模式。有没有其他方法代替在dataframe列中强制转换类型?
暂无答案!
目前还没有任何答案,快来回答吧!