scala 将Spark Dataframe 中的结构转换为数组

k4ymrczo  于 2022-11-09  发布在  Scala
关注(0)|答案(1)|浏览(173)

我在Spark中有一个 Dataframe ,如下所示。

{"emp_id":1,"emp_name":"John","cust_id":"c1","cust_detail":[{"name":"abc","acc_no":123,"mobile":000},{"name":"abc","acc_no":123,"mobile":111},{"name":"abc","acc_no":123,"mobile":222}]}

我正在寻找如下的输出。

{"emp_id":1,"emp_name":"John","cust_id":"c1","cust_detail":[{"name":["abc"],"acc_no":[123],"mobile":[000,123,222]}
nxagd54h

nxagd54h1#

这就是您想要的东西:首先分解列,然后再聚合回来。

val spark:SparkSession = SparkSession.builder().master("local[1]")
  .appName("learn")
  .getOrCreate()

val inputdf = spark.read.option("multiline","false").json("C:\\Users\\User\\OneDrive\\Desktop\\source_file.txt")

val newdf1 = inputdf.withColumn("cust_detail_exploded",explode(col("cust_detail"))).drop("cust_detail")

val newdf2 = newdf1.select( "cust_id", "emp_name","emp_id","cust_detail_exploded.mobile", "cust_detail_exploded.acc_no","cust_detail_exploded.name")

val newdf3 = newdf2.groupBy("cust_id").agg(array(struct(collect_set(col("mobile")).as("mobile"),collect_set(col("acc_no")).as("acc_no"),collect_set(col("name")).as("name"))).as("cust_detail") )

newdf3.printSchema()

newdf3.write.json("C:\\Users\\User\\OneDrive\\Desktop\\newww.txt")

产出:

{"cust_id":"c1","cust_detail":[{"mobile":["111","000","222"],"acc_no":["123"],"name":["abc"]}]}

相关问题