sparkDataframe中基于map的列转换

pu82cl6c  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(384)

我想根据scalaMap表示的配置转换Dataframe中的一些列。
我有两个箱子:
接收Map Map[String, Seq[String]] 以及列col1、col2,以转换col3,前提是Map中存在key=col1的实体,并且col2在该实体值列表中。
接收Map Map[String, (Long, Long) 以及col1,col2,如果在一个key=col1的Map中有一个实体,并且col2在longs的元组as(start,end)所描述的范围内,则转换col3。
示例:
案例1有这个表和Map(u1->seq(w1,w11),u2->seq(w2,w22))

+------+------+------+
| col1 | col2 | col3 | 
+------+------+------+
| u1   | w1   | v1   |
+------+------+------+
| u2   | w2   | v2   |
+------+------+------+
| u3   | w3   | v3   |
+------+------+------+

我想在col3中添加“x-”前缀,前提是它与术语匹配

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1   | w1   | x-v1 |
+------+------+------+
| u2   | w2   | x-v2 |
+------+------+------+
| u3   | w3   | v3   |
+------+------+------+

案例2:此表和Map(“u1”->(1,5),u2->(2,4))

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1   | 2    | v1   |
+------+------+------+
| u1   | 6    | v11  |
+------+------+------+
| u2   | 3    | v3   |
+------+------+------+
| u3   | 4    | v3   |
+------+------+------+

预期输出应为:

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1   | 2    | x-v1 |
+------+------+------+
| u1   | 6    | v11  |
+------+------+------+
| u2   | 3    | x-v3 |
+------+------+------+
| u3   | 4    | v3   |
+------+------+------+

这可以很容易地由udf来完成,但就性能而言,我不想使用它们。
在spark 2.4.2中,有没有一种方法可以实现它?
谢谢

xfb7svmp

xfb7svmp1#

另一种选择- import org.apache.spark.sql.functions.typedLit ###案例1

df1.show(false)
    df1.printSchema()
    /**
      * +----+----+----+
      * |col1|col2|col3|
      * +----+----+----+
      * |u1  |w1  |v1  |
      * |u2  |w2  |v2  |
      * |u3  |w3  |v3  |
      * +----+----+----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: string (nullable = true)
      * |-- col3: string (nullable = true)
      */
val case1 = Map("u1" -> Seq("w1","w11"), "u2" -> Seq("w2","w22"))

    val p1 = df1.withColumn("case1", typedLit(case1))
      .withColumn("col3",
        when(array_contains(expr("case1[col1]"), $"col2"), concat(lit("x-"), $"col3"))
          .otherwise($"col3")
      )
    p1.show(false)
    p1.printSchema()
    /**
      * +----+----+----+----------------------------------+
      * |col1|col2|col3|case1                             |
      * +----+----+----+----------------------------------+
      * |u1  |w1  |x-v1|[u1 -> [w1, w11], u2 -> [w2, w22]]|
      * |u2  |w2  |x-v2|[u1 -> [w1, w11], u2 -> [w2, w22]]|
      * |u3  |w3  |v3  |[u1 -> [w1, w11], u2 -> [w2, w22]]|
      * +----+----+----+----------------------------------+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: string (nullable = true)
      * |-- col3: string (nullable = true)
      * |-- case1: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: array (valueContainsNull = true)
      * |    |    |-- element: string (containsNull = true)
      */

案例2

df2.show(false)
    df2.printSchema()
    /**
      * +----+----+----+
      * |col1|col2|col3|
      * +----+----+----+
      * |u1  |2   |v1  |
      * |u1  |6   |v11 |
      * |u2  |3   |v3  |
      * |u3  |4   |v3  |
      * +----+----+----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: integer (nullable = true)
      * |-- col3: string (nullable = true)
      */
val case2 = Map("u1" -> (1,5), "u2" -> (2, 4))
    val p = df2.withColumn("case2", typedLit(case2))
      .withColumn("col3",
        when(expr("col2 between case2[col1]._1 and case2[col1]._2"), concat(lit("x-"), $"col3"))
          .otherwise($"col3")
      )
    p.show(false)
    p.printSchema()

    /**
      * +----+----+----+----------------------------+
      * |col1|col2|col3|case2                       |
      * +----+----+----+----------------------------+
      * |u1  |2   |x-v1|[u1 -> [1, 5], u2 -> [2, 4]]|
      * |u1  |6   |v11 |[u1 -> [1, 5], u2 -> [2, 4]]|
      * |u2  |3   |x-v3|[u1 -> [1, 5], u2 -> [2, 4]]|
      * |u3  |4   |v3  |[u1 -> [1, 5], u2 -> [2, 4]]|
      * +----+----+----+----------------------------+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: integer (nullable = true)
      * |-- col3: string (nullable = true)
      * |-- case2: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: struct (valueContainsNull = true)
      * |    |    |-- _1: integer (nullable = false)
      * |    |    |-- _2: integer (nullable = false)
      */
ovfsdjhp

ovfsdjhp2#

scala> caseTwoDF
.withColumn("data",caseTwoExpr)
.withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1  |2   |x-v1|
|u1  |6   |v11 |
|u2  |3   |x-v3|
|u3  |4   |v3  |
+----+----+----+
cngwdvgl

cngwdvgl3#

检查以下代码。
注意-
我已将第二个案例的Map值更改为 Map("u1" -> Seq(1,5), u2 -> Seq(2, 4)) 将Map值转换为 json map ,将jsonMap作为列值添加到dataframe,然后对dataframe应用逻辑。
如果可能的话,可以直接在jsonMap中添加值,这样就可以避免将Map转换为jsonMap。
导入所需的库。

import org.apache.spark.sql.types._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

案例1逻辑

scala> val caseOneDF = Seq(("u1","w1","v1"),("u2","w2","v2"),("u3","w3","v3")).toDF("col1","col2","col3")
caseOneDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]
scala> val caseOneMap = Map("u1" -> Seq("w1","w11"),"u2" -> Seq("w2","w22"))
caseOneMap: scala.collection.immutable.Map[String,Seq[String]] = Map(u1 -> List(w1, w11), u2 -> List(w2, w22))
scala> val caseOneJsonMap = lit(compact(render(caseOneMap)))
caseOneJsonMap: org.apache.spark.sql.Column = {"u1":["w1","w11"],"u2":["w2","w22"]}
scala> val caseOneSchema = MapType(StringType,ArrayType(StringType))
caseOneSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(StringType,true),true)
scala> val caseOneExpr = from_json(caseOneJsonMap,caseOneSchema)
caseOneExpr: org.apache.spark.sql.Column = entries

案例1最终输出

scala> dfa
.withColumn("data",caseOneExpr)
.withColumn("col3",when(expr("array_contains(data[col1],col2)"),concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1  |w1  |x-v1|
|u2  |w2  |x-v2|
|u3  |w3  |v3  |
+----+----+----+

案例2逻辑

scala> val caseTwoDF = Seq(("u1",2,"v1"),("u1",6,"v11"),("u2",3,"v3"),("u3",4,"v3")).toDF("col1","col2","col3")
caseTwoDF: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
scala> val caseTwoMap = Map("u1" -> Seq(1,5),"u2" -> Seq(2,4))
caseTwoMap: scala.collection.immutable.Map[String,Seq[Int]] = Map(u1 -> List(1, 5), u2 -> List(2, 4))
scala> val caseTwoJsonMap = lit(compact(render(caseTwoMap)))
caseTwoJsonMap: org.apache.spark.sql.Column = {"u1":[1,5],"u2":[2,4]}
scala> val caseTwoSchema = MapType(StringType,ArrayType(IntegerType))
caseTwoSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(IntegerType,true),true)
scala> val caseTwoExpr = from_json(caseTwoJsonMap,caseTwoSchema)
caseTwoExpr: org.apache.spark.sql.Column = entries

案例2最终输出

scala> caseTwoDF
.withColumn("data",caseTwoExpr)
.withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1  |2   |x-v1|
|u1  |6   |v11 |
|u2  |3   |x-v3|
|u3  |4   |v3  |
+----+----+----+

相关问题