我有一个函数。像下面,并希望计算估计groupBy键。数据需要按时间在组内排序。
看起来用spark.df不容易/不可能,所以我尝试了rdd,但是即使我使用自定义的分区(组的数量),part/recors?中的“shuffle”也会部分返回错误的结果(不是所有组)。
我如何才能避免这种情况,并计算它在并行与一个干净的分组给定的排序?提前感谢,基督教
func
def estimate(rows):
estimated = float(0.0)
result = []
for row in rows:
time, key, available, level, reduction, total = row
if level >= 0.3:
estimated += float(available - reduction)
estimated = min(estimated, total)
else:
estimated =float(0.0)
result.append((time, key, available, level, reduction, total, estimated))
return iter(result)
字符串
使用mapPartitions的方法
def partition_func(key):
return hash(key)
rdd = df_input.rdd.map(lambda row: (row["key"], row))
partitioned_rdd = rdd.partitionBy(numPartitions=n, partitionFunc=partition_func)
new_df = (partitioned_rdd.map(lambda x: x[1])
.mapPartitions(estimate)
.toDF()
)
型
使用groupByKeys.flatMapValues(estimate)的方法在并行化中也不干净。
1条答案
按热度按时间kq4fsx7k1#
我会将数组按
key
分组,然后使用applyInPandas
计算每组的估计值。这种方法将并行分配对应于每个唯一键的组,以提高计算效率。字符串