在加入lookup数据集后执行多列值查找

oaxa6hgo  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(301)

我正在使用spark-sql-2.4.1v如何根据列的值进行各种连接我需要获得多个查找值 map_val 列为给定值列,如下所示。
样本数据:

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12),
  ("21", "score", "school", "2018-03-31", 13 , 13),
  ("22", "rate", "school", "2018-03-31", 11 , 14),
  ("21", "rate", "school", "2018-03-31", 13 , 12)
 )
val df = data.toDF("id", "code", "entity", "date", "value1", "value2")

df.show

+---+-----+------+----------+------+------+
| id| code|entity|      date|value1|value2|
+---+-----+------+----------+------+------+
| 20|score|school|2018-03-31|    14|    12|
| 21|score|school|2018-03-31|    13|    13|
| 22| rate|school|2018-03-31|    11|    14|
| 21| rate|school|2018-03-31|    13|    12|
+---+-----+------+----------+------+------+

查找数据集 rateDs :

val rateDs = List(
  ("21","2018-01-31","2018-06-31", 12 ,"C"),
  ("21","2018-01-31","2018-06-31", 13 ,"D")
).toDF("id","start_date","end_date", "map_code","map_val")

rateDs.show

+---+----------+----------+--------+-------+
| id|start_date|  end_date|map_code|map_val|
+---+----------+----------+--------+-------+
| 21|2018-01-31|2018-06-31|      12|      C|
| 21|2018-01-31|2018-06-31|      13|      D|
+---+----------+----------+--------+-------+

与的查找表联接 map_val 列基于 start_date 以及 end_date :

val  resultDs = df.filter(col("code").equalTo(lit("rate"))).join(rateDs , 
            (
                   df.col("date").between(rateDs.col("start_date"), rateDs.col("end_date"))
                   .and(rateDs.col("id").equalTo(df.col("id"))) 
                   //.and(rateDs.col("mapping_value").equalTo(df.col("mean"))) 
            )
            , "left"
            )
            //.drop("start_date")
            //.drop("end_date")

resultDs.show

+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| id|code|entity|      date|value1|value2|  id|start_date|  end_date|map_code|map_val|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| 21|rate|school|2018-03-31|    13|    12|  21|2018-01-31|2018-06-31|      13|      D|
| 21|rate|school|2018-03-31|    13|    12|  21|2018-01-31|2018-06-31|      12|      C|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+

预期输出应为:

+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| id|code|entity|      date|value1|value2|  id|start_date|  end_date|map_code|map_val|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+
| 21|rate|school|2018-03-31|    D |    C |  21|2018-01-31|2018-06-31|      13|      D|
| 21|rate|school|2018-03-31|    D |    C |  21|2018-01-31|2018-06-31|      12|      C|
+---+----+------+----------+------+------+----+----------+----------+--------+-------+

如果需要更多的细节,请告诉我。

mwg9r5ms

mwg9r5ms1#

试试这个- Create lookup map before join per id and use the same to replace ```
val newRateDS = rateDs.withColumn("lookUpMap",
map_from_entries(collect_list(struct(col("map_code"), col("map_val"))).over(Window.partitionBy("id")))
)

newRateDS.show(false)
/**
  * +---+----------+----------+--------+-------+------------------+
  * |id |start_date|end_date  |map_code|map_val|lookUpMap         |
  * +---+----------+----------+--------+-------+------------------+
  * |21 |2018-01-31|2018-06-31|12      |C      |[12 -> C, 13 -> D]|
  * |21 |2018-01-31|2018-06-31|13      |D      |[12 -> C, 13 -> D]|
  * +---+----------+----------+--------+-------+------------------+
  */

val  resultDs = df.filter(col("code").equalTo(lit("rate"))).join(broadcast(newRateDS) ,
  rateDs("id") === df("id") && df("date").between(rateDs("start_date"), rateDs("end_date"))
    //.and(rateDs.col("mapping_value").equalTo(df.col("mean")))
  , "left"
)

resultDs.withColumn("value1", expr("coalesce(lookUpMap[value1], value1)"))
  .withColumn("value2", expr("coalesce(lookUpMap[value2], value2)"))
  .show(false)

/**
  * +---+----+------+----------+------+------+----+----------+----------+--------+-------+------------------+
  * |id |code|entity|date      |value1|value2|id  |start_date|end_date  |map_code|map_val|lookUpMap         |
  * +---+----+------+----------+------+------+----+----------+----------+--------+-------+------------------+
  * |22 |rate|school|2018-03-31|11    |14    |null|null      |null      |null    |null   |null              |
  * |21 |rate|school|2018-03-31|D     |C     |21  |2018-01-31|2018-06-31|13      |D      |[12 -> C, 13 -> D]|
  * |21 |rate|school|2018-03-31|D     |C     |21  |2018-01-31|2018-06-31|12      |C      |[12 -> C, 13 -> D]|
  * +---+----+------+----------+------+------+----+----------+----------+--------+-------+------------------+
  */

相关问题