python—在pyspark中实现递归算法以在Dataframe中查找对

ds97pgxw  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(634)

我有一个spark数据框( prof_student_df )列出学生/教授对作为时间戳。每个时间戳有4个教授和4个学生,每个教授-学生对有一个“分数”(因此每个时间框架有16行)。对于每个时间段,我都需要找到教授/学生之间的一对一配对,以最大限度地提高总分。每个教授只能在一个时间范围内与一个学生匹配。
例如,这里是一个时间段的配对/分数。

+------------+--------------+------------+-------+----------+
|    time    | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1           | s1         |   0.7 | FALSE    |
| 1596048041 | p1           | s2         |   0.5 | TRUE     |
| 1596048041 | p1           | s3         |   0.3 | FALSE    |
| 1596048041 | p1           | s4         |   0.2 | FALSE    |
| 1596048041 | p2           | s1         |   0.9 | TRUE     |
| 1596048041 | p2           | s2         |   0.1 | FALSE    |
| 1596048041 | p2           | s3         |  0.15 | FALSE    |
| 1596048041 | p2           | s4         |   0.2 | FALSE    |
| 1596048041 | p3           | s1         |   0.2 | FALSE    |
| 1596048041 | p3           | s2         |   0.3 | FALSE    |
| 1596048041 | p3           | s3         |   0.4 | FALSE    |
| 1596048041 | p3           | s4         |   0.8 | TRUE     |
| 1596048041 | p4           | s1         |   0.2 | FALSE    |
| 1596048041 | p4           | s2         |   0.3 | FALSE    |
| 1596048041 | p4           | s3         |  0.35 | TRUE     |
| 1596048041 | p4           | s4         |   0.4 | FALSE    |
+------------+--------------+------------+-------+----------+

我们的目标是得到这个是匹配列。它可以是一个布尔值或0/1位或其他任何有效的值。
在上述示例中,p1与s2匹配,p2与s1匹配,p3与s4匹配,p4与s3匹配,因为这是使总分最大化的组合(得到2.55分)。有一个奇怪的边缘的情况-有可能有少于4名教授或学生在给定的时间框架。如果有4个教授和3个学生,那么1个教授将没有配对,他的所有is\u匹配都是假的。同样,如果有3个教授和4个学生,1个学生将没有配对,他的所有is\u匹配都是假的。
有人知道我该怎么做吗?我在想,我应该按时间划分或分组,然后将数据输入到某个udf中,该udf会吐出对,然后可能我必须将其连接回原始行(尽管我不确定)。我尝试在pyspark中实现这个逻辑,可以使用sparksql/sql或pyspark。
理想情况下,我希望这是尽可能有效的,因为将有数百万行。在这个问题中,我提到了递归算法,因为这是一个传统的递归类型的问题,但如果有一个更快的解决方案,不使用递归,我是开放的。
非常感谢,我是一个新的Spark和一个如何做到这一点有点难懂。
编辑:澄清这个问题,因为我知道在我的例子中,我没有具体说明这一天,将有多达14名教授和14名学生可供选择。我只是一次看一天,这就是为什么我在数据框中没有日期。在任何一个时间范围内,最多有4名教授和4名学生。这个Dataframe只显示一个时间帧。但在下一个时间框架内,这4位教授有可能 p5 , p1 , p7 , p9 或者类似的。学生们可能还在 s1 , s2 , s3 , s4 .

eiee3dmh

eiee3dmh1#

正如我们的朋友@cronoik提到的,您需要使用匈牙利算法,我在python中看到的关于不平衡分配问题的最佳代码是:https://github.com/mayorx/hungarian-algorithm (存储库中还有一些示例:))
您只需将Dataframe转换为numpy数组并传递给km\u matcher,然后根据km\u matcher的答案在spark中添加一个带有withcolumn函数的列。

ntjbwcob

ntjbwcob2#

编辑:如评论中所述,要解决更新中提到的问题,我们可以使用密集列将每次的student\u id转换为通用序列id,执行步骤1到3(使用student列),然后使用join将每次的student\u id转换回其原始student\u id。请参阅下面的步骤0和步骤4。如果一个时间单位中有少于4个教授,那么在numpy end中,维度将调整为4(使用np\u vstack()和np\u zeros()),请参阅更新的函数 find_assigned .
您可以尝试pandas\u udf和scipy.optimize.linear\u sum\u赋值(注意:后端方法是@cronoik在主要注解中提到的匈牙利算法),如下所示:

from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np

df = spark.createDataFrame([
    ('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),
    ('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),
    ('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),
    ('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),
    ('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),
    ('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])

N = 4
cols_student = [*range(1,N+1)]

第0步:添加额外列 student ,并使用 time + student_id + student .

w1 = Window.partitionBy('time').orderBy('student_id')

df = df.withColumn('student', dense_rank().over(w1))
+----------+------------+----------+-----+-------+                              
|      time|professor_id|student_id|score|student|
+----------+------------+----------+-----+-------+
|1596048041|          p1|        s1|  0.7|      1|
|1596048041|          p2|        s1|  0.9|      1|
|1596048041|          p3|        s1|  0.2|      1|
|1596048041|          p4|        s1|  0.2|      1|
|1596048041|          p1|        s2|  0.5|      2|
|1596048041|          p2|        s2|  0.1|      2|
|1596048041|          p3|        s2|  0.3|      2|
|1596048041|          p4|        s2|  0.3|      2|
|1596048041|          p1|        s3|  0.3|      3|
|1596048041|          p2|        s3| 0.15|      3|
|1596048041|          p3|        s3|  0.4|      3|
|1596048041|          p4|        s3| 0.35|      3|
|1596048041|          p1|        s4|  0.2|      4|
|1596048041|          p2|        s4|  0.2|      4|
|1596048041|          p3|        s4|  0.8|      4|
|1596048041|          p4|        s4|  0.4|      4|
+----------+------------+----------+-----+-------+

df3 = df.select('time','student_id','student').dropDuplicates()
+----------+----------+-------+                                                 
|      time|student_id|student|
+----------+----------+-------+
|1596048041|        s1|      1|
|1596048041|        s2|      2|
|1596048041|        s3|      3|
|1596048041|        s4|      4|
+----------+----------+-------+

第一步:使用pivot查找教授与学生的矩阵,注意我们将分数的负数设置为pivot的值,这样我们就可以使用scipy.optimize.linear\u sum\u赋值来查找赋值问题的最小代价:

df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
+----------+------------+----+----+-----+----+
|      time|professor_id|   1|   2|    3|   4|
+----------+------------+----+----+-----+----+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|
+----------+------------+----+----+-----+----+

第二步:使用pandas\u udf和scipy.optimize.linear\u sum\u赋值获得列索引,然后将相应的列名赋给新列 assigned :


# returnSchema contains one more StringType column `assigned` than schema from the input pdf:

schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')

# since the # of students are always N, we can use np.vstack to set the N*N matrix

# below `n` is the number of professors/rows in pdf

# sz is the size of input Matrix, sz=4 in this example

def __find_assigned(pdf, sz):
  cols = pdf.columns[2:]
  n = pdf.shape[0]
  n1 = pdf.iloc[:,2:].fillna(0).values
  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))
  return pdf.assign(assigned=[cols[i] for i in idx][:n])

find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)

df2 = df1.groupby('time').apply(find_assigned)
+----------+------------+----+----+-----+----+--------+
|      time|professor_id|   1|   2|    3|   4|assigned|
+----------+------------+----+----+-----+----+--------+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|       3|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|       1|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|       2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|       4|
+----------+------------+----+----+-----+----+--------+

注意:根据@oluwafemisule的建议,我们可以使用参数 maximize 而不是否定分数值。此参数可用于scipy 1.4.0+:

_, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)

第三步:使用sparksql堆栈函数对上述df2进行规范化,对score值求反,过滤score为null的行。理想的 is_match 列应具有 assigned==student :

df_new = df2.selectExpr(
  'time',
  'professor_id',
  'assigned',
  'stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))

df_new.show()
+----------+------------+--------+-------+-----+--------+
|      time|professor_id|assigned|student|score|is_match|
+----------+------------+--------+-------+-----+--------+
|1596048041|          p4|       3|      1|  0.2|   false|
|1596048041|          p4|       3|      2|  0.3|   false|
|1596048041|          p4|       3|      3| 0.35|    true|
|1596048041|          p4|       3|      4|  0.4|   false|
|1596048041|          p2|       1|      1|  0.9|    true|
|1596048041|          p2|       1|      2|  0.1|   false|
|1596048041|          p2|       1|      3| 0.15|   false|
|1596048041|          p2|       1|      4|  0.2|   false|
|1596048041|          p1|       2|      1|  0.7|   false|
|1596048041|          p1|       2|      2|  0.5|    true|
|1596048041|          p1|       2|      3|  0.3|   false|
|1596048041|          p1|       2|      4|  0.2|   false|
|1596048041|          p3|       4|      1|  0.2|   false|
|1596048041|          p3|       4|      2|  0.3|   false|
|1596048041|          p3|       4|      3|  0.4|   false|
|1596048041|          p3|       4|      4|  0.8|    true|
+----------+------------+--------+-------+-----+--------+

步骤4:使用join将student转换回student\ id(如果可能,使用broadcast join):

df_new = df_new.join(df3, on=["time", "student"])
+----------+-------+------------+--------+-----+--------+----------+            
|      time|student|professor_id|assigned|score|is_match|student_id|
+----------+-------+------------+--------+-----+--------+----------+
|1596048041|      1|          p1|       2|  0.7|   false|        s1|
|1596048041|      2|          p1|       2|  0.5|    true|        s2|
|1596048041|      3|          p1|       2|  0.3|   false|        s3|
|1596048041|      4|          p1|       2|  0.2|   false|        s4|
|1596048041|      1|          p2|       1|  0.9|    true|        s1|
|1596048041|      2|          p2|       1|  0.1|   false|        s2|
|1596048041|      3|          p2|       1| 0.15|   false|        s3|
|1596048041|      4|          p2|       1|  0.2|   false|        s4|
|1596048041|      1|          p3|       4|  0.2|   false|        s1|
|1596048041|      2|          p3|       4|  0.3|   false|        s2|
|1596048041|      3|          p3|       4|  0.4|   false|        s3|
|1596048041|      4|          p3|       4|  0.8|    true|        s4|
|1596048041|      1|          p4|       3|  0.2|   false|        s1|
|1596048041|      2|          p4|       3|  0.3|   false|        s2|
|1596048041|      3|          p4|       3| 0.35|    true|        s3|
|1596048041|      4|          p4|       3|  0.4|   false|        s4|
+----------+-------+------------+--------+-----+--------+----------+

df_new = df_new.drop("student", "assigned")

相关问题