使用sparksql将json数据加载到配置单元中

k5hmc34c  于 2021-06-29  发布在  Hive
关注(0)|答案(2)|浏览(350)

我无法将json数据推入配置单元,下面是示例json数据和我的工作。请给我推荐丢失的那个
json数据

{
"Employees" : [
{
"userId":"rirani",
"jobTitleName":"Developer",
"firstName":"Romin",
"lastName":"Irani",
"preferredFullName":"Romin Irani",
"employeeCode":"E1",
"region":"CA",
"phoneNumber":"408-1234567",
"emailAddress":"romin.k.irani@gmail.com"
},
{
"userId":"nirani",
"jobTitleName":"Developer",
"firstName":"Neil",
"lastName":"Irani",
"preferredFullName":"Neil Irani",
"employeeCode":"E2",
"region":"CA",
"phoneNumber":"408-1111111",
"emailAddress":"neilrirani@gmail.com"
},
{
"userId":"thanks",
"jobTitleName":"Program Directory",
"firstName":"Tom",
"lastName":"Hanks",
"preferredFullName":"Tom Hanks",
"employeeCode":"E3",
"region":"CA",
"phoneNumber":"408-2222222",
"emailAddress":"tomhanks@gmail.com"
}
]
}

我尝试使用sqlcontext和jsonfile方法来加载,但未能解析json

val f = sqlc.jsonFile("file:///home/vm/Downloads/emp.json")
f.show 

error is :  java.lang.RuntimeException: Failed to parse a value for data type StructType() (current token: VALUE_STRING)

我尝试了不同的方法,并且能够破解并获得模式

val files = sc.wholeTextFiles("file:///home/vm/Downloads/emp.json")        
val jsonData = files.map(x => x._2)
sqlc.jsonRDD(jsonData).registerTempTable("employee")
val emp= sqlc.sql("select Employees[1].userId as ID,Employees[1].jobTitleName as Title,Employees[1].firstName as FirstName,Employees[1].lastName as LastName,Employees[1].preferredFullName as PeferedName,Employees[1].employeeCode as empCode,Employees[1].region as Region,Employees[1].phoneNumber as Phone,Employees[1].emailAddress as email from employee")
emp.show // displays all the values

我能够分别获得每个记录的数据和模式,但我缺少一个想法,即获取所有数据并加载到配置单元中。
任何帮助或建议都是值得的。

ngynwnxp

ngynwnxp1#

这是破解的答案

val files = sc.wholeTextFiles("file:///home/vm/Downloads/emp.json")
val jsonData = files.map(x => x._2)
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveContext
val hc=new HiveContext(sc)
hc.jsonRDD(jsonData).registerTempTable("employee")
val fuldf=hc.jsonRDD(jsonData)
val dfemp=fuldf.select(explode(col("Employees")))
dfemp.saveAsTable("empdummy")
val df=sql("select * from empdummy")
df.select ("_c0.userId","_c0.jobTitleName","_c0.firstName","_c0.lastName","_c0.preferredFullName","_c0.employeeCode","_c0.region","_c0.phoneNumber","_c0.emailAddress").saveAsTable("dummytab")

任何优化上述代码的建议。

vs3odd8k

vs3odd8k2#

当文件每行包含一个json对象时,sparksql只支持读取json文件。
sqlcontext.scala文件

/**
   * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
   * It goes through the entire dataset once to determine the schema.
   *
   * @group specificdata
   * @deprecated As of 1.4.0, replaced by `read().json()`. This will be removed in Spark 2.0.
   */
  @deprecated("Use read.json(). This will be removed in Spark 2.0.", "1.4.0")
  def jsonFile(path: String): DataFrame = {
    read.json(path)
  }

您的文件应该是这样的(严格地说,它不是一个正确的json文件)

{"userId":"rirani","jobTitleName":"Developer","firstName":"Romin","lastName":"Irani","preferredFullName":"Romin Irani","employeeCode":"E1","region":"CA","phoneNumber":"408-1234567","emailAddress":"romin.k.irani@gmail.com"}
{"userId":"nirani","jobTitleName":"Developer","firstName":"Neil","lastName":"Irani","preferredFullName":"Neil Irani","employeeCode":"E2","region":"CA","phoneNumber":"408-1111111","emailAddress":"neilrirani@gmail.com"} 
{"userId":"thanks","jobTitleName":"Program Directory","firstName":"Tom","lastName":"Hanks","preferredFullName":"Tom Hanks","employeeCode":"E3","region":"CA","phoneNumber":"408-2222222","emailAddress":"tomhanks@gmail.com"}

请看一下尚未解决的jira问题。别认为这是优先考虑的,但要记录在案。
你有两个选择
将json数据转换为支持的格式,每行一个对象
每个json对象有一个文件-这将导致文件过多。
请注意 SQLContext.jsonFile 已弃用,请使用 SQLContext.read.json .
spark文档中的示例

相关问题