在spark中查找每个组最常见的非空前缀

smtd7mpg  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(586)

我需要编写一个结构化查询,在每个唯一的\u guest \u id中查找最常见的非空前缀(出现)。有输入数据:

val inputDf = Seq(
  (1, "Mr"),
  (1, "Mme"),
  (1, "Mr"),
  (1, null),
  (1, null),
  (1, null),
  (2, "Mr"),
  (3, null)).toDF("UNIQUE_GUEST_ID", "PREFIX")
println("Input:")
inputDf.show(false)

我的解决方案是:

inputDf
  .groupBy($"UNIQUE_GUEST_ID")
  .agg(collect_list($"PREFIX").alias("PREFIX"))

但这不是我需要的:期望:

+---------------+------+
|UNIQUE_GUEST_ID|PREFIX|
+---------------+------+
|1              |Mr    |
|2              |Mr    |
|3              |null  |
+---------------+------+

实际值:

+---------------+-------------+
|UNIQUE_GUEST_ID|PREFIX       |
+---------------+-------------+
|1              |[Mr, Mme, Mr]|
|3              |[]           |
|2              |[Mr]         |
+---------------+-------------+
goucqfw6

goucqfw61#

试试这个-

val inputDf = Seq(
      (1, "Mr"),
      (1, "Mme"),
      (1, "Mr"),
      (1, null),
      (1, null),
      (1, null),
      (2, "Mr"),
      (3, null)).toDF("UNIQUE_GUEST_ID", "PREFIX")
    println("Input:")
    inputDf.show(false)
    /**
      * Input:
      * +---------------+------+
      * |UNIQUE_GUEST_ID|PREFIX|
      * +---------------+------+
      * |1              |Mr    |
      * |1              |Mme   |
      * |1              |Mr    |
      * |1              |null  |
      * |1              |null  |
      * |1              |null  |
      * |2              |Mr    |
      * |3              |null  |
      * +---------------+------+
      */

    inputDf
      .groupBy($"UNIQUE_GUEST_ID", $"PREFIX").agg(count($"PREFIX").as("count"))
      .groupBy($"UNIQUE_GUEST_ID")
      .agg(max( struct( $"count", $"PREFIX")).as("max"))
      .selectExpr("UNIQUE_GUEST_ID", "max.PREFIX")
      .show(false)

    /**
      * +---------------+------+
      * |UNIQUE_GUEST_ID|PREFIX|
      * +---------------+------+
      * |2              |Mr    |
      * |1              |Mr    |
      * |3              |null  |
      * +---------------+------+
      */
lqfhib0f

lqfhib0f2#

val df2 = inputDf.groupBy('UNIQUE_GUEST_ID,'PREFIX).agg(count('PREFIX).as("ct"))
val df3 = df2.groupBy('UNIQUE_GUEST_ID).agg(max('ct).as("ct"))
df2.join(df3,Seq("ct","UNIQUE_GUEST_ID")).show()

输出:

+---+---------------+------+
| ct|UNIQUE_GUEST_ID|PREFIX|
+---+---------------+------+
|  1|              2|    Mr|
|  0|              3|  null|
|  2|              1|    Mr|
+---+---------------+------+

相关问题