sparkscala:转换Dataframe以生成新的列gender,反之亦然

m0rkklqb  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(481)

关闭。这个问题需要更加突出重点。它目前不接受答案。
**想改进这个问题吗?**通过编辑这篇文章更新这个问题,使它只关注一个问题。

六个月前关门了。
改进这个问题

表1:

  1. class male female
  2. 1 2 1
  3. 2 0 2
  4. 3 2 0

表2:

  1. class gender
  2. 1 m
  3. 1 f
  4. 1 m
  5. 2 f
  6. 2 f
  7. 3 m
  8. 3 m

使用spark scala从表1中获取数据,并以表2的格式转储到另一个表中,如所示。也请反之亦然
请帮帮我,伙计们。
提前谢谢

xienkqul

xienkqul1#

  1. val inDF = Seq((1,2,1),
  2. (2, 0, 2),
  3. (3, 2, 0)).toDF("class", "male", "female")
  4. val testUdf = udf((m: Int, f: Int) => {
  5. val ml = 1.to(m).map(_ => "m")
  6. val fml = 1.to(f).map(_ => "f")
  7. ml ++ fml
  8. })
  9. val df1 = inDF.withColumn("mf", testUdf('male, 'female))
  10. .drop("male", "female")
  11. .select('class, explode('mf).alias("gender"))
mm5n2pyu

mm5n2pyu2#

也许这有帮助-
without UDF spark>=2.4 ###加载提供的测试数据

  1. val data =
  2. """
  3. |class | male | female
  4. |1 | 2 | 1
  5. |2 | 0 | 2
  6. |3 | 2 | 0
  7. """.stripMargin
  8. val stringDS1 = data.split(System.lineSeparator())
  9. .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
  10. .toSeq.toDS()
  11. val df1 = spark.read
  12. .option("sep", ",")
  13. .option("inferSchema", "true")
  14. .option("header", "true")
  15. .option("nullValue", "null")
  16. .csv(stringDS1)
  17. df1.show(false)
  18. df1.printSchema()
  19. /**
  20. * +-----+----+------+
  21. * |class|male|female|
  22. * +-----+----+------+
  23. * |1 |2 |1 |
  24. * |2 |0 |2 |
  25. * |3 |2 |0 |
  26. * +-----+----+------+
  27. *
  28. * root
  29. * |-- class: integer (nullable = true)
  30. * |-- male: integer (nullable = true)
  31. * |-- female: integer (nullable = true)
  32. */

计算性别数组并爆炸

  1. df1.select($"class",
  2. when($"male" >= 1, sequence(lit(1), col("male"))).otherwise(array()).as("male"),
  3. when($"female" >= 1, sequence(lit(1), col("female"))).otherwise(array()).as("female")
  4. ).withColumn("male", expr("TRANSFORM(male, x -> 'm')"))
  5. .withColumn("female", expr("TRANSFORM(female, x -> 'f')"))
  6. .withColumn("gender", explode(concat($"male", $"female")))
  7. .select("class", "gender")
  8. .show(false)
  9. /**
  10. * +-----+------+
  11. * |class|gender|
  12. * +-----+------+
  13. * |1 |m |
  14. * |1 |m |
  15. * |1 |f |
  16. * |2 |f |
  17. * |2 |f |
  18. * |3 |m |
  19. * |3 |m |
  20. * +-----+------+
  21. */

反之亦然

  1. df2.groupBy("class").agg(collect_list("gender").as("gender"))
  2. .withColumn("male", expr("size(FILTER(gender, x -> x='m'))"))
  3. .withColumn("female", expr("size(FILTER(gender, x -> x='f'))"))
  4. .select("class", "male", "female")
  5. .orderBy("class")
  6. .show(false)
  7. /**
  8. * +-----+----+------+
  9. * |class|male|female|
  10. * +-----+----+------+
  11. * |1 |2 |1 |
  12. * |2 |0 |2 |
  13. * |3 |2 |0 |
  14. * +-----+----+------+
  15. */
展开查看全部
kxe2p93d

kxe2p93d3#

你可以用 udf 以及 explode 功能如下。

  1. import org.apache.spark.sql.functions._
  2. import spark.implicits._
  3. val df=Seq((1,2,1),(2,0,2),(3,2,0)).toDF("class","male","female")
  4. //Input Df
  5. +-----+----+------+
  6. |class|male|female|
  7. +-----+----+------+
  8. | 1| 2| 1|
  9. | 2| 0| 2|
  10. | 3| 2| 0|
  11. +-----+----+------+
  12. val getGenderUdf=udf((x:Int,y:Int)=>List.fill(x)("m")++List.fill(y)("f"))
  13. val df1=df.withColumn("gender",getGenderUdf(df.col("male"),df.col("female"))).drop("male","female").withColumn("gender",explode($"gender"))
  14. df1.show()
  15. +-----+------+
  16. |class|gender|
  17. +-----+------+
  18. | 1| m|
  19. | 1| m|
  20. | 1| f|
  21. | 2| f|
  22. | 2| f|
  23. | 3| m|
  24. | 3| m|
  25. +-----+------+

与df1相反

  1. val df2=df1.groupBy("class").pivot("gender").agg(count("gender")).na.fill(0).withColumnRenamed("m","male").withColumnRenamed("f","female")
  2. df2.show()
  3. //Sample Output:
  4. +-----+------+----+
  5. |class|female|male|
  6. +-----+------+----+
  7. | 1| 1| 2|
  8. | 3| 0| 2|
  9. | 2| 2| 0|
  10. +-----+------+----+
展开查看全部

相关问题