我有一个spark数据框( prof_student_df
)列出学生/教授对作为时间戳。每个时间戳有4个教授和4个学生,每个教授-学生对有一个“分数”(因此每个时间框架有16行)。对于每个时间段,我都需要找到教授/学生之间的一对一配对,以最大限度地提高总分。每个教授只能在一个时间范围内与一个学生匹配。
例如,这里是一个时间段的配对/分数。
+------------+--------------+------------+-------+----------+
| time | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1 | s1 | 0.7 | FALSE |
| 1596048041 | p1 | s2 | 0.5 | TRUE |
| 1596048041 | p1 | s3 | 0.3 | FALSE |
| 1596048041 | p1 | s4 | 0.2 | FALSE |
| 1596048041 | p2 | s1 | 0.9 | TRUE |
| 1596048041 | p2 | s2 | 0.1 | FALSE |
| 1596048041 | p2 | s3 | 0.15 | FALSE |
| 1596048041 | p2 | s4 | 0.2 | FALSE |
| 1596048041 | p3 | s1 | 0.2 | FALSE |
| 1596048041 | p3 | s2 | 0.3 | FALSE |
| 1596048041 | p3 | s3 | 0.4 | FALSE |
| 1596048041 | p3 | s4 | 0.8 | TRUE |
| 1596048041 | p4 | s1 | 0.2 | FALSE |
| 1596048041 | p4 | s2 | 0.3 | FALSE |
| 1596048041 | p4 | s3 | 0.35 | TRUE |
| 1596048041 | p4 | s4 | 0.4 | FALSE |
+------------+--------------+------------+-------+----------+
我们的目标是得到这个是匹配列。它可以是一个布尔值或0/1位或其他任何有效的值。
在上述示例中,p1与s2匹配,p2与s1匹配,p3与s4匹配,p4与s3匹配,因为这是使总分最大化的组合(得到2.55分)。有一个奇怪的边缘的情况-有可能有少于4名教授或学生在给定的时间框架。如果有4个教授和3个学生,那么1个教授将没有配对,他的所有is\u匹配都是假的。同样,如果有3个教授和4个学生,1个学生将没有配对,他的所有is\u匹配都是假的。
有人知道我该怎么做吗?我在想,我应该按时间划分或分组,然后将数据输入到某个udf中,该udf会吐出对,然后可能我必须将其连接回原始行(尽管我不确定)。我尝试在pyspark中实现这个逻辑,可以使用sparksql/sql或pyspark。
理想情况下,我希望这是尽可能有效的,因为将有数百万行。在这个问题中,我提到了递归算法,因为这是一个传统的递归类型的问题,但如果有一个更快的解决方案,不使用递归,我是开放的。
非常感谢,我是一个新的Spark和一个如何做到这一点有点难懂。
编辑:澄清这个问题,因为我知道在我的例子中,我没有具体说明这一天,将有多达14名教授和14名学生可供选择。我只是一次看一天,这就是为什么我在数据框中没有日期。在任何一个时间范围内,最多有4名教授和4名学生。这个Dataframe只显示一个时间帧。但在下一个时间框架内,这4位教授有可能 p5
, p1
, p7
, p9
或者类似的。学生们可能还在 s1
, s2
, s3
, s4
.
2条答案
按热度按时间eiee3dmh1#
正如我们的朋友@cronoik提到的,您需要使用匈牙利算法,我在python中看到的关于不平衡分配问题的最佳代码是:https://github.com/mayorx/hungarian-algorithm (存储库中还有一些示例:))
您只需将Dataframe转换为numpy数组并传递给km\u matcher,然后根据km\u matcher的答案在spark中添加一个带有withcolumn函数的列。
ntjbwcob2#
编辑:如评论中所述,要解决更新中提到的问题,我们可以使用密集列将每次的student\u id转换为通用序列id,执行步骤1到3(使用student列),然后使用join将每次的student\u id转换回其原始student\u id。请参阅下面的步骤0和步骤4。如果一个时间单位中有少于4个教授,那么在numpy end中,维度将调整为4(使用np\u vstack()和np\u zeros()),请参阅更新的函数
find_assigned
.您可以尝试pandas\u udf和scipy.optimize.linear\u sum\u赋值(注意:后端方法是@cronoik在主要注解中提到的匈牙利算法),如下所示:
第0步:添加额外列
student
,并使用time
+student_id
+student
.第一步:使用pivot查找教授与学生的矩阵,注意我们将分数的负数设置为pivot的值,这样我们就可以使用scipy.optimize.linear\u sum\u赋值来查找赋值问题的最小代价:
第二步:使用pandas\u udf和scipy.optimize.linear\u sum\u赋值获得列索引,然后将相应的列名赋给新列
assigned
:注意:根据@oluwafemisule的建议,我们可以使用参数
maximize
而不是否定分数值。此参数可用于scipy 1.4.0+:第三步:使用sparksql堆栈函数对上述df2进行规范化,对score值求反,过滤score为null的行。理想的
is_match
列应具有assigned==student
:步骤4:使用join将student转换回student\ id(如果可能,使用broadcast join):