基于数据优先存储的sparkDataframe

ngynwnxp  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(184)

我有2个数据集,根据优先级我想存储我的数据
数据集1--->是关于我的表的详细信息

+----------+----------+-------------+---------+
|   user_id|  rate    |date         | Class    |
+----------+----------+-------------+---------+
| XO_121   | 10       |2020-04-20   | A       |         
| XO_121   | 20       |2020-04-21   |  B      |         
| XO_121   | 30       |2020-04-22   |    C    |        
| XO_121   |0          |2020-04-23   |       D |       
| XO_123   |10        |2020-04-24   |       B |        
| XO_123   |40         |2020-04-25   |      D  |

数据集2--->列出类的优先级

+----------- +----------+-
|   Class |  Priority  |
+-----------+----------+-
| A         | 1        |               
| B         |  2       |
| C         | 3      |               
| D         |  4       |            
| E         |  5       |

输出数据集应该是这样的

+----------+----------+-------------+---------+
|   user_id|  rate    |date         | Class    |
+----------+----------+-------------+---------+
| XO_121   | 10       |2020-04-20   | A       |          
| XO_123   |10         |2020-04-25   |      B  |

所以我想存储具有相同 user_id 但不一样 rate 基于优先级表。我已经尝试过我的代码,但它不工作我是新的分区。

bfnvny8b

bfnvny8b1#

您可以执行联接并获取每个组中的第一行:

from pyspark.sql import functions as F, Window

result = df1.join(df2, 'Class').withColumn(
    'rn', 
    F.row_number().over(Window.partitionBy('user_id').orderBy('Priority'))
).filter('rn = 1').drop('rn')

result.show()
+-----+-------+----+----------+--------+
|Class|user_id|rate|      date|Priority|
+-----+-------+----+----------+--------+
|    B| XO_123|  10|2020-04-24|       2|
|    A| XO_121|  10|2020-04-20|       1|
+-----+-------+----+----------+--------+

您没有指定您的语言,但在scala中会指定

import org.apache.spark.sql.expressions.Window

val result = df1.join(df2, "Class").withColumn(
    "rn", 
    row_number().over(Window.partitionBy("user_id").orderBy("Priority"))
).filter("rn = 1").drop("rn")

相关问题