如何将dataframeMap列转换为struct列?

1l5u6lss  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(690)

假设我们有一个 DataFrame 有一列 map 类型。将其转换为 struct (或者,等价地,用相同的键和值定义一个新列,但作为 struct 类型)?请参见以下内容 spark-shell (2.4.5)会话,以一种极其低效的方式进行:

val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")

val jsonStr = df.withColumn("jsonized", to_json($"mapColumn")).select("jsonized").collect()(0)(0).asInstanceOf[String]

spark.read.json(Seq(jsonStr).toDS()).show()
+---+---+
|bar|foo|
+---+---+
|  2|  1|
+---+---+

现在,很明显 collect() 是非常低效的,这通常是一个可怕的方式做事情的Spark。但实现这种转化的首选方法是什么? named_struct 以及 struct 两者都采用一系列参数值来构造结果,但我找不到任何方法来“展开”Map键/值以将它们传递给这些函数。

mkh04yzy

mkh04yzy1#

我会用 explode 功能:

+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+
``` `df.select(explode('mapColumn)).select(struct('*).as("struct"))` 输出:

+--------+
| struct|
+--------+
|[foo, 1]|
|[bar, 2]|
+--------+

root
|-- struct: struct (nullable = false)
| |-- key: string (nullable = false)
| |-- value: integer (nullable = false)

xv8emn3q

xv8emn3q2#

我看到了@chlebek的答案,但如果它应该放在一行中,你可以使用自定义项

scala> val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")
df: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>]

scala> df.show
+--------------------+
|           mapColumn|
+--------------------+
|[foo -> 1, bar -> 2]|
+--------------------+

scala> case class KeyValue(key: String, value: String)
defined class KeyValue

scala> val toArrayOfStructs = udf((value: Map[String, String]) => value.map {
     |   case (k, v) => KeyValue(k, v)
     | }.toArray )
toArrayOfStructs: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,true)),true),Some(List(MapType(StringType,StringType,true))))

scala> df.withColumn("alfa", toArrayOfStructs(col("mapColumn")))
res4: org.apache.spark.sql.DataFrame = [mapColumn: map<string,int>, alfa: array<struct<key:string,value:string>>]

scala> res4.show
+--------------------+--------------------+
|           mapColumn|                alfa|
+--------------------+--------------------+
|[foo -> 1, bar -> 2]|[[foo, 1], [bar, 2]]|
+--------------------+--------------------+

scala> res4.printSchema
root
 |-- mapColumn: map (nullable = false)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- alfa: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)
0wi1tuuw

0wi1tuuw3#

定义案例类

case class Bean56(foo: Int, bar: Int)
//Define a bean Encoder
val personEncoder = Encoders.bean[Bean56](classOf[Bean56])

    val df = spark.sql("""select map("foo", 1, "bar", 2) AS mapColumn""")

//Map the output to required bean
    val Bean56s = df.map(row => {
      val map = row.getMap[String, Int](0)
      Bean56(map.getOrElse("foo", -1), map.getOrElse("bar", -1))
    })(personEncoder)  // Supply implicit Encoder of the bean
    Bean56s.foreach(println(_)) // Print the bean

相关问题