在Scala Spark中使用其他列的值创建新列

gijlo24d  于 2022-12-13  发布在  Apache
关注(0)|答案(1)|浏览(152)

我有一个输入 Dataframe :

输入数据格式=

+--------------------------+-----------------------------+
| info (String)            |   chars (Seq[String])       |
+--------------------------+-----------------------------+
|weight=100,height=70      | [weight,height]             |
+--------------------------+-----------------------------+
|weight=92,skinCol=white   | [weight,skinCol]            |
+--------------------------+-----------------------------+
|hairCol=gray,skinCol=white| [hairCol,skinCol]           |
+--------------------------+-----------------------------+

如何将此 Dataframe 作为输出?我事先不知道字符列中包含哪些字符串

输出数据格式=

+--------------------------+-----------------------------+-------+-------+-------+-------+
| info (String)            |   chars (Seq[String])       | weight|height |skinCol|hairCol|
+--------------------------+-----------------------------+-------+-------+-------+-------+
|weight=100,height=70      | [weight,height]             |  100  | 70    | null  |null   |
+--------------------------+-----------------------------+-------+-------+-------+-------+
|weight=92,skinCol=white   | [weight,skinCol]            |  92   |null   |white  |null   |
+--------------------------+-----------------------------+-------+-------+-------+-------+
|hairCol=gray,skinCol=white| [hairCol,skinCol]           |null   |null   |white  |gray   |
+--------------------------+-----------------------------+-------+-------+-------+-------+

我还想将以下Seq[String]保存为变量,但不对 Dataframe 使用 .collect() 函数。

val aVariable: Seq[String] = [weight, height, skinCol, hairCol]
thtygnil

thtygnil1#

创建另一个以info列的键为中心的 Dataframe ,然后使用id列将其联接回去:

import spark.implicits._
val data = Seq(
  ("weight=100,height=70", Seq("weight", "height")),
  ("weight=92,skinCol=white", Seq("weight", "skinCol")),
  ("hairCol=gray,skinCol=white", Seq("hairCol", "skinCol"))
)

val df = spark.sparkContext.parallelize(data).toDF("info", "chars")
  .withColumn("id", monotonically_increasing_id() + 1)

val pivotDf = df
  .withColumn("tmp", split(col("info"), ","))
  .withColumn("tmp", explode(col("tmp")))
  .withColumn("val1", split(col("tmp"), "=")(0))
  .withColumn("val2", split(col("tmp"), "=")(1)).select("id", "val1", "val2")
  .groupBy("id").pivot("val1").agg(first(col("val2")))

df.join(pivotDf, Seq("id"), "left").drop("id").show(false)

+--------------------------+------------------+-------+------+-------+------+
|info                      |chars             |hairCol|height|skinCol|weight|
+--------------------------+------------------+-------+------+-------+------+
|weight=100,height=70      |[weight, height]  |null   |70    |null   |100   |
|hairCol=gray,skinCol=white|[hairCol, skinCol]|gray   |null  |white  |null  |
|weight=92,skinCol=white   |[weight, skinCol] |null   |null  |white  |92    |
+--------------------------+------------------+-------+------+-------+------+

对于第二个问题,您可以在 Dataframe 中获取这些值,如下所示:

df.withColumn("tmp", explode(split(col("info"), ",")))
  .withColumn("values", split(col("tmp"), "=")(0)).select("values").distinct().show()

+-------+
| values|
+-------+
| height|
|hairCol|
|skinCol|
| weight|
+-------+

但是如果不使用collect,就无法在Seq变量中获取它们,这是不可能。

相关问题