我试图通过spark streaming从一个kafka主题中获取信息,然后解析主题中得到的json。为了在数据流中获取主题,我使用stringreader,然后使用foreach从数据流中获取每个rdd:
myRDD.collect().foreach(println)
为了将myrdd转换成json(当我打印myrdd时,json的格式是正确的)并提取我需要的两个字段,我尝试使用json4s并尝试了以下方法:
val jsonEvent = Json.parse(myRDD)
val srcIp = (jsonEvent / "src_ip")
val dstIp = (jsonEvent / "dst_ip")
我也尝试过这样使用json4s:
val jsonEvent = parse(myRDD).asInstanceOf[JObject]
val srcIp = jsonEvent / "src_ip"
但它也不能正常工作。
这是输出:
java.lang.NoSuchMethodError: rg.json4s.jackson.JsonMethods$.parse$default$3()Z
以下是我正在使用的版本:
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.10</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.10</artifactId>
<version>3.5.1</version>
</dependency>
我认为问题是我不明白如何将rdd中的每条记录转换成json对象来解析它。有没有人能更深入地向我解释一下,让我明白它是怎么工作的?我的代码正确吗?
谢谢您。
1条答案
按热度按时间ycggw6v21#
spark为您提供了将输入json读取到数据集/Dataframe的api。
如果您正在从文件中读取json。你可以使用
SparkSession
read().json()
方法如果你在读Kafka的作品。您可以通过将每个rdd转换为
List
的String
然后将它们转换为数据集/Dataframe对于查询数据集/Dataframe,可以使用
Select()
函数,然后将其转换为所需的数据类型。