spark大型数据框基于其他列值添加新列

ne5o7dgx  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(209)

我是spark的新手,正在处理20gb左右的大型数据集(多个小文件),需要帮助将这些数据转换为以下格式:
我有以下格式的数据:

+----------+-------------------------+-------------------+---------+------+
|   id     |       values            |     creation date | leadTime| span |
+----------+-------------------------+-------------------+---------+--+---+
|id_1      |[[v1, 0.368], [v2, 0.5]] |     2020-07-15    |      16 |  15  |
|id_2      |[[v1, 0.368], [v2, 0.4]] |     2020-07-15    |      16 |  15  |
|id_1      |[[v1, 0.468], [v2, 0.3]] |     2020-07-15    |      17 |  18  |
|id_2      |[[v1, 0.368], [v2, 0.3]] |     2020-07-15    |      17 |  18  | 
+----------+-------------------------+-------------------+---------+------+

我需要以下格式的数据,使用列字段中的值:
使用leadtime和span column value创建列名为的新列

+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|   id     |creation date | final_v1_16_15_wk  |  final_v2_16_15_wk |final_v1_17_18_wk  |  final_v2_17_18_wk  |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|id_1      |2020-07-15    |       0.368        |         0.5        |       0.468        |         0.3        |
|id_2      |2020-07-15    |       0.368        |         0.4        |       0.368        |         0.3        |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+

以下是示例Dataframe:

val df = Seq(
  ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
  ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
  ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
  ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")

尝试使用以下逻辑生成列名/值,但无效:

val modDF = finalDF.withColumn("final_" + newFinalDF("values").getItem(0).getItem("_1") + "_" + newFinalDF("leadTime") + "_" + newFinalDF("span") + "_wk", $"values".getItem(0).getItem("_2"));
eoigrqb6

eoigrqb61#

pivot可用于此。

import org.apache.spark.sql.functions._
val explodeDf=df.select(col("id"),col("creation date"),explode_outer(col("values")),col("leadTime"),col("span"))
val finalDf=explodeDf.select(col("id"),col("creation date"),col("value"),concat(lit("final_"),col("key"),lit("_"),col("leadTime"),lit("_"),col("span"),lit("_wk")).as("colDerived"))
finalDf.groupBy(col("id"),col("creation date")).pivot(col("colDerived")).agg(sum(col("value"))).show()

   +----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|id_1|   2020-07-15|            0.368|            0.564|              0.5|             0.78|              0.6|             0.65|
|id_2|   2020-07-15|            0.468|            0.657|              0.3|             0.65|             0.66|             0.67|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+

如何透视sparkDataframe?

tyg4sfes

tyg4sfes2#

在进行必要的值更改以适合列标题之后,可以使用pivot。如果预先提供列列表,pivot将有更好的性能,否则spark将在列上运行distinct。

import org.apache.spark.sql.functions._
val df = Seq(
  ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
  ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
  ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
  ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")

val df2 = df.select($"id",explode_outer($"values"),$"creation date", $"leadTime", $"span")
.withColumn("keys", concat(lit("final_"), col("key")))
.withColumn("leadTimes", concat(lit("_"), col("leadTime"),lit("_")))
.withColumn("spans", concat(col("span"),lit("_wk")))
.drop("leadTime","key","span")
.withColumnRenamed("keys","key").withColumnRenamed("leadTimes","leadTime").withColumnRenamed("spans","span")

val df3 = df2.groupBy($"id",$"creation date").pivot(concat($"key",$"leadTime",$"span")).agg(first("value"))
df3.show()

输出

+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  id|creation date|final_v1_16_15_wk|final_v1_17_18_wk|final_v2_16_15_wk|final_v2_17_18_wk|final_v3_16_15_wk|final_v3_17_18_wk|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|id_1|   2020-07-15|            0.368|            0.564|              0.5|             0.78|              0.6|             0.65|
|id_2|   2020-07-15|            0.468|            0.657|              0.3|             0.65|             0.66|             0.67|
+----+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

相关问题