我将一个json文件推入一个kafka主题,在presto中连接该主题,并将json数据结构化到一个可查询的表中。
我面临的问题是,presto不获取数据,因为“currentnode”为null,所以它的错误不能调用“com.fasterxml.jackson.databind.jsonnode.has(string)”。
将数据推入Kafka主题的代码:
object Producer extends App{
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
val producer = new KafkaProducer[String,JsonNode](props)
println("inside prducer")
val mapper = (new ObjectMapper() with ScalaObjectMapper).
registerModule(DefaultScalaModule).
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
findAndRegisterModules(). // register joda and java-time modules automatically
asInstanceOf[ObjectMapper with ScalaObjectMapper]
val filename = "/Users/rishunigam/Documents/devicd.json"
val jsonNode: JsonNode= mapper.readTree(new File(filename))
val s = jsonNode.size()
for(i <- 0 to jsonNode.size()) {
val js = jsonNode.get(i)
println(js)
val record = new ProducerRecord[String, JsonNode]( "tpch.devicelog", js)
println(record)
producer.send( record )
}
println("producer complete")
producer.close()
}
暂无答案!
目前还没有任何答案,快来回答吧!