如何在Spark上实现JSON字符串到 Dataframe 转换

omtl5h9j  于 2023-02-26  发布在  Spark
关注(0)|答案(8)|浏览(495)

我想把下面的字符串变量转换成spark上的 Dataframe 。

val jsonStr = "{ "metadata": { "key": 84896, "value": 54 }}"

我知道如何从json文件创建 Dataframe 。

sqlContext.read.json("file.json")

但我不知道如何从字符串变量创建 Dataframe 。
我怎样才能把json字符串变量转换成dataframe。

qco9c6ql

qco9c6ql1#

对于Spark 2.2+:

import spark.implicits._
val jsonStr = """{ "metadata": { "key": 84896, "value": 54 }}"""
val df = spark.read.json(Seq(jsonStr).toDS)

对于Spark 2.1.x

val events = sc.parallelize("""{"action":"create","timestamp":"2016-01-07T00:01:17Z"}""" :: Nil)    
val df = sqlContext.read.json(events)

提示:这是使用sqlContext.read.json(jsonRDD: RDD[Stirng])重载。还有sqlContext.read.json(path: String),它直接读取Json文件。
对于旧版本:

val jsonStr = """{ "metadata": { "key": 84896, "value": 54 }}"""
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
qmb5sa22

qmb5sa222#

因为从RDD阅读JSON的函数在Spark 2.2中已经过时了,所以这是另一种选择:

val jsonStr = """{ "metadata": { "key": 84896, "value": 54 }}"""
import spark.implicits._ // spark is your SparkSession object
val df = spark.read.json(Seq(jsonStr).toDS)
3ks5zfa0

3ks5zfa03#

下面是一个如何在Java(Spark 2.2+)中将Json字符串转换为Dataframe的示例:

String str1 = "{\"_id\":\"123\",\"ITEM\":\"Item 1\",\"CUSTOMER\":\"Billy\",\"AMOUNT\":285.2}";
String str2 = "{\"_id\":\"124\",\"ITEM\":\"Item 2\",\"CUSTOMER\":\"Sam\",\"AMOUNT\":245.85}";
List<String> jsonList = new ArrayList<>();
jsonList.add(str1);
jsonList.add(str2);
SparkContext sparkContext = new SparkContext(new SparkConf()
        .setAppName("myApp").setMaster("local"));
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
SQLContext sqlContext = new SQLContext(sparkContext);
JavaRDD<String> javaRdd = javaSparkContext.parallelize(jsonList);
Dataset<Row> data = sqlContext.read().json(javaRdd);
data.show();

结果如下:

+------+--------+------+---+
|AMOUNT|CUSTOMER|  ITEM|_id|
+------+--------+------+---+
| 285.2|   Billy|Item 1|123|
|245.85|     Sam|Item 2|124|
+------+--------+------+---+
nwwlzxa7

nwwlzxa74#

simple_json = '{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}'
rddjson = sc.parallelize([simple_json])
df = sqlContext.read.json(rddjson)

对答案的引用是https://stackoverflow.com/a/49399359/2187751

8e2ybdfx

8e2ybdfx5#

在Spark 2.2中将json字符串列表转换为DataFrame =〉

val spark = SparkSession
          .builder()
          .master("local")
          .appName("Test")
          .getOrCreate()

var strList = List.empty[String]
var jsonString1 = """{"ID" : "111","NAME":"Arkay","LOC":"Pune"}"""
var jsonString2 = """{"ID" : "222","NAME":"DineshS","LOC":"PCMC"}"""
strList = strList :+ jsonString1
strList = strList :+ jsonString2

val rddData = spark.sparkContext.parallelize(strList)
resultDF = spark.read.json(rddData)
resultDF.show()

结果:

+---+----+-------+
| ID| LOC|   NAME|
+---+----+-------+
|111|Pune|  Arkay|
|222|PCMC|DineshS|
+---+----+-------+
ttisahbt

ttisahbt6#

现在可以直接从Dataset[String]中读取json:https://spark.apache.org/docs/latest/sql-data-sources-json.html

val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
92dk7w1h

92dk7w1h7#

在某些情况下会出现一些错误,如非法模式组件:XXX,因此您需要在www.example.com中添加带有时间戳的.optionspark.read以便更新代码。

val spark = SparkSession
          .builder()
          .master("local")
          .appName("Test")
          .getOrCreate()
import spark.implicits._
val jsonStr = """{ "metadata": { "key": 84896, "value": 54 }}"""
val df = spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").json(Seq(jsonStr).toDS)
df.show()
vqlkdk9b

vqlkdk9b8#

瓦尔jsonStr =“""{“元数据”:{“键”:84896,“值”:54 }}“””
瓦尔df =Spark读取json(Spark创建数据集(jsonStr::无))
显示(df.withColumn(“键”,$“元数据.键”).withColumn(“值”,$“元数据.值”))

相关问题