kafkaavrodecoder对象到Dataframe的转换

ykejflvf  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(247)

在带有kafka和schema registry的spark流中,在接收到dstream之后,如何将dstream批处理转换为spark中的dataframe?
从confluent使用kafkaavrodecoder后的数据流类型是dstream(string,object)。当我使用下面的代码时,它会将avro列中的模式数据类型(如int)更改为long。

val kafkaStream: DStream[(String, Object)] =
      KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
    ssc, kafkaParams, Set(topic)
      )

  // Load JSON strings into DataFrame
  kafkaStream.foreachRDD { rdd =>
    // Get the singleton instance of SQLContext
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext.implicits._

val topicValueStrings = rdd.map(_._2.toString)
    val df = sqlContext.read.json(topicValueStrings)

代码参考
object.tosting和读取为json释放int的模式。有没有其他方法代替在dataframe列中强制转换类型?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题