嗨,我有一个Dataframepl\ U join\ U lfd\ U排名如下:
+-----------+-----------+----------+--------+--------+-------------+
|FACILITY_ID|LOCATION_ID|PATIENT_ID|DISTANCE|CAPACITY|rank_distance|
+-----------+-----------+----------+--------+--------+-------------+
|FAC003 |LOC0001 |P1 |54 |3 |2 |
|FAC002 |LOC0001 |P1 |45 |2 |1 |
|FAC003 |LOC0001 |P2 |54 |3 |2 |
|FAC002 |LOC0001 |P2 |45 |2 |1 |
|FAC006 |LOC0010 |P3 |12 |2 |1 |
|FAC003 |LOC0010 |P3 |54 |3 |4 |
fac\ U cap\ U map如下
Map(FAC004 -> 0, FAC003 -> 0, FAC007 -> 0, FAC002 -> 0, FAC006 -> 0, FAC005 -> 0)
我想创建一个新的列当前容量,为其计算我创建了一个自定义项。
def cur_cap_udf(m: Map[Any, Int]) = udf( (cap: Int,fac:String) =>
m foreach {case (key,value) => if ((key == fac) && (value < cap) ) value +1 else value}
)
调用udf
val finaldf1 = PL_join_LFD_ranked.withColumn("current_capacity", cur_cap_udf(fac_cap_map)(PL_join_LFD_ranked("CAPACITY"),PL_join_LFD_ranked("FACILITY_ID")))
我得到的错误如下
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported
reason foreach返回单位类型。尝试使用foldleft,但当需要上一个操作的结果时,将使用foldleft。但事实并非如此。
我只是检查map中的值是否小于传递给udf的容量,然后将map值增加1。这是当前容量的逻辑。
2条答案
按热度按时间mitkmikd1#
您需要从自定义项返回值。
foreach
返回单位,因此返回错误。ufj5ltwl2#
我认为,你的问题与spark无关,而与“如何返回整数”有关
value
从cur_cap
功能”。另外,深入研究一下函数,你想得到什么样的结果,什么样的Dataframe?据我所知,使用当前代码,每个设施都将分别进行评估,所以
(key == fac)
会是True
每行仅一次。也许你应该试试看PL_join_LFD_ranked.groupBy(col("FACILITY_ID").agg(sum("CAPACITY")))
,然后以某种方式处理能力(也许 吧,.withColumn("capped_capacity", f.min("capacity_sum", "capacity_cap")
)