当PySpark中2个不同表中的2个其他值匹配时,更新特定值

2cmtqfgy  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(119)

你知道怎么用PySpark写这个吗?
我有两个PySpark DataFrame要合并。但是,有一个值我想基于2个重复的列值进行更新。
吡啶衍生物1:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-26|        655|          N|           A|
| 2022-09-26|        656|          N|           B|
| 2022-09-26|        657|          N|           C|
| 2022-09-26|        658|          N|           D|
+-----------+-----------+-----------+------------+

PyDf2:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-27|        655|          N|           D|
| 2022-09-27|        656|          N|           C|
| 2022-09-27|        657|          N|           B|
| 2022-09-27|        658|          N|           A|
| 2022-09-26|        655|          N|           B|  <- Duplicate test_date & student_id, different grade
+-----------+-----------+-----------+------------+

所需输出:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-26|        655|          N|           B|  <- Updated to B for grade
| 2022-09-26|        656|          N|           B|
| 2022-09-26|        657|          N|           C|
| 2022-09-26|        658|          N|           D|
| 2022-09-27|        655|          N|           D|
| 2022-09-27|        656|          N|           C|
| 2022-09-27|        657|          N|           B|
| 2022-09-27|        658|          N|           A|
+-----------+-----------+-----------+------------+
kqqjbcuj

kqqjbcuj1#

使用窗口函数。逻辑和代码如下

df = (PyDf1.unionByName(PyDf2)#Union the dfs
          .withColumn('CurrentGrade', lead('grade').over(Window.partitionBy('student_id','test_date').orderBy('student_id',desc(to_date('test_date')))))#Create column comparing consecutive grades
          .where(col('CurrentGrade').isNull())#retain last grade by dropping null
          .drop('CurrentGrade')#Drop the temp column
     )
i1icjdpr

i1icjdpr2#

想出了解决办法:
1.并集两个表
1.添加索引列
1.使用parititionBy(Windows函数)指定row_number编号
1.筛选行和列


# Union the two table

df_new = PyDf1.union(PyDf2)

# Add Index column

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, concat
w = Window.orderBy('test_date')
df_new = df_new.withColumn('index', row_number().over(w))

# Assign row_number number using parititionBy (Windows Function)

from pyspark.sql.window import Window
windowSpec  = Window.partitionBy(col("test_date"),
                                 col("student_id"),
                                 col('take_home')) \
                    .orderBy(col("index").desc())
df_new = df_new.withColumn("row_number",row_number().over(windowSpec))

# Filter rows and columns

df_new = df_new.filter((df_new.row_number == 1) & (df_new.delete_flag == 'N'))
columns = ['test_date', 'student_id', 'test_date', 'grade']
df_new = df_new.select(columns)

相关问题