sparkscala:转换Dataframe以生成新的列gender,反之亦然

m0rkklqb  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(415)

关闭。这个问题需要更加突出重点。它目前不接受答案。
**想改进这个问题吗?**通过编辑这篇文章更新这个问题,使它只关注一个问题。

六个月前关门了。
改进这个问题

表1:

class   male    female
1   2   1
2   0   2
3   2   0

表2:

class   gender
1   m
1   f
1   m
2   f
2   f
3   m
3   m

使用spark scala从表1中获取数据,并以表2的格式转储到另一个表中,如所示。也请反之亦然
请帮帮我,伙计们。
提前谢谢

xienkqul

xienkqul1#

val inDF = Seq((1,2,1),
    (2, 0, 2),
    (3, 2, 0)).toDF("class", "male", "female")

  val testUdf = udf((m: Int, f: Int) => {
    val ml = 1.to(m).map(_ => "m")
    val fml = 1.to(f).map(_ => "f")
    ml ++ fml
  })

  val df1 = inDF.withColumn("mf", testUdf('male, 'female))
  .drop("male", "female")
  .select('class, explode('mf).alias("gender"))
mm5n2pyu

mm5n2pyu2#

也许这有帮助-
without UDF spark>=2.4 ###加载提供的测试数据

val data =
      """
        |class |  male  |  female
        |1 |  2 |  1
        |2 |  0 |  2
        |3 |  2 |  0
      """.stripMargin

    val stringDS1 = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()

    /**
      * +-----+----+------+
      * |class|male|female|
      * +-----+----+------+
      * |1    |2   |1     |
      * |2    |0   |2     |
      * |3    |2   |0     |
      * +-----+----+------+
      *
      * root
      * |-- class: integer (nullable = true)
      * |-- male: integer (nullable = true)
      * |-- female: integer (nullable = true)
      */

计算性别数组并爆炸

df1.select($"class",
      when($"male" >= 1, sequence(lit(1), col("male"))).otherwise(array()).as("male"),
      when($"female" >= 1, sequence(lit(1), col("female"))).otherwise(array()).as("female")
    ).withColumn("male", expr("TRANSFORM(male, x -> 'm')"))
      .withColumn("female", expr("TRANSFORM(female, x -> 'f')"))
      .withColumn("gender", explode(concat($"male", $"female")))
      .select("class", "gender")
      .show(false)

    /**
      * +-----+------+
      * |class|gender|
      * +-----+------+
      * |1    |m     |
      * |1    |m     |
      * |1    |f     |
      * |2    |f     |
      * |2    |f     |
      * |3    |m     |
      * |3    |m     |
      * +-----+------+
      */

反之亦然

df2.groupBy("class").agg(collect_list("gender").as("gender"))
      .withColumn("male", expr("size(FILTER(gender, x -> x='m'))"))
      .withColumn("female", expr("size(FILTER(gender, x -> x='f'))"))
      .select("class", "male", "female")
      .orderBy("class")
      .show(false)

    /**
      * +-----+----+------+
      * |class|male|female|
      * +-----+----+------+
      * |1    |2   |1     |
      * |2    |0   |2     |
      * |3    |2   |0     |
      * +-----+----+------+
      */
kxe2p93d

kxe2p93d3#

你可以用 udf 以及 explode 功能如下。

import org.apache.spark.sql.functions._
  import spark.implicits._

  val df=Seq((1,2,1),(2,0,2),(3,2,0)).toDF("class","male","female")

//Input Df

+-----+----+------+
|class|male|female|
+-----+----+------+
|    1|   2|     1|
|    2|   0|     2|
|    3|   2|     0|
+-----+----+------+

  val getGenderUdf=udf((x:Int,y:Int)=>List.fill(x)("m")++List.fill(y)("f"))
  val df1=df.withColumn("gender",getGenderUdf(df.col("male"),df.col("female"))).drop("male","female").withColumn("gender",explode($"gender"))
  df1.show()

+-----+------+
|class|gender|
+-----+------+
|    1|     m|
|    1|     m|
|    1|     f|
|    2|     f|
|    2|     f|
|    3|     m|
|    3|     m|
+-----+------+

与df1相反

val df2=df1.groupBy("class").pivot("gender").agg(count("gender")).na.fill(0).withColumnRenamed("m","male").withColumnRenamed("f","female")

  df2.show()

//Sample Output: 

+-----+------+----+
|class|female|male|
+-----+------+----+
|    1|     1|   2|
|    3|     0|   2|
|    2|     2|   0|
+-----+------+----+

相关问题