presto给出了这个错误:无法调用“com.fasterxml.jackson.databind.jsonnode.has(string)”,因为“currentnode”为null

r6hnlfcb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(255)

我将一个json文件推入一个kafka主题,在presto中连接该主题,并将json数据结构化到一个可查询的表中。
我面临的问题是,presto不获取数据,因为“currentnode”为null,所以它的错误不能调用“com.fasterxml.jackson.databind.jsonnode.has(string)”。
将数据推入Kafka主题的代码:

object Producer extends App{
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
    val producer = new KafkaProducer[String,JsonNode](props)
    println("inside prducer")
    val mapper = (new ObjectMapper() with ScalaObjectMapper).
        registerModule(DefaultScalaModule).
        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
        findAndRegisterModules(). // register joda and java-time modules automatically
        asInstanceOf[ObjectMapper with ScalaObjectMapper] 
     val filename = "/Users/rishunigam/Documents/devicd.json"
     val jsonNode: JsonNode=  mapper.readTree(new File(filename))
     val s = jsonNode.size()
     for(i <- 0 to jsonNode.size()) {
    val js = jsonNode.get(i)
   println(js)
  val record = new ProducerRecord[String, JsonNode]( "tpch.devicelog", js)
   println(record)
  producer.send( record )
}
    println("producer complete")
    producer.close()

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题