创建一个嵌套字段并使用scala将dataframe存储到mongodb?

sh7euo9m  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(483)

我在使用scala,在将hbase中的数据Map到mongodb中时遇到了一个问题。让我解释一下:
我在hbase中有一个人和地址表,一个人对应多个地址。

val sparkSession = SparkSession.builder().getOrCreate()

val dfPerson = sparkSession.read.format()...

dfPerson.show():

+---------+--------------------+--------------------+-------+---------+
|PERSON_ID|           LAST_NAME|          FIRST_NAME|COUNTRY|     CITY|
+---------+--------------------+--------------------+-------+---------+
|     1005|               Miley|                John|  Spain|   Madrid|
|     1005|               Miley|                John|  Spain|Barcele..|
|     1009|              Rodney|              Justin| France|    Paris|
|     1009|              Rodney|              Justin| France|  Creteil|
+---------+--------------------+--------------------+------+---------+

我需要以嵌套对象格式Map这些数据,然后将其存储在mongodb中,其中有一个collection具有arrays address块,如下所示:

[ {
    name: "John"
    lasteName: "Miley"
    address:[
      {city: "Bacelona", country: "Spain", ... },
      {city: "Madrid", country: "Spain", ...},
       ...
    ]
    },
   {  
    name: "Justin"
    lasteName: "Rodney",
    address: [..]
  }
]

是否有任何框架来Map这些escanrie?
谢谢你的建议

mec1mxoz

mec1mxoz1#

你可以这样做。

val df = Seq(
  ("1005", "Miley", "John", "Spain", "Barceleona"),
  ("1009", "Rodney", "Justin", "France", "Paris"),
  ("1009", "Rodney", "Justin", "France", "Creteil")
).toDF("PERSON_ID", "LAST_NAME", "FIRST_NAME", "COUNTRY", "CITY")

//New column names 
val newCols = List("id", "lastName", "name", "country", "city")

//rename all columns and groupby to create nested address  
val resultDF = df.select(df.columns.zip(newCols).map(c => col(c._1).as(c._2)):_*)
  .groupBy("id", "name", "lastName")
  .agg(collect_list(struct($"city", $"country")).as("address"))

最终架构:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)

这是您想要的最终模式。要存储到mongodb,可以使用 mongo-spark-connector" https://docs.mongodb.com/spark-connector/current/scala-api/

相关问题