Pyspark -顺序估计函数,-并行

piah890a  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(150)

我有一个函数。像下面,并希望计算估计groupBy键。数据需要按时间在组内排序。
看起来用spark.df不容易/不可能,所以我尝试了rdd,但是即使我使用自定义的分区(组的数量),part/recors?中的“shuffle”也会部分返回错误的结果(不是所有组)。
我如何才能避免这种情况,并计算它在并行与一个干净的分组给定的排序?提前感谢,基督教

func

  1. def estimate(rows):
  2. estimated = float(0.0)
  3. result = []
  4. for row in rows:
  5. time, key, available, level, reduction, total = row
  6. if level >= 0.3:
  7. estimated += float(available - reduction)
  8. estimated = min(estimated, total)
  9. else:
  10. estimated =float(0.0)
  11. result.append((time, key, available, level, reduction, total, estimated))
  12. return iter(result)

字符串

使用mapPartitions的方法

  1. def partition_func(key):
  2. return hash(key)
  3. rdd = df_input.rdd.map(lambda row: (row["key"], row))
  4. partitioned_rdd = rdd.partitionBy(numPartitions=n, partitionFunc=partition_func)
  5. new_df = (partitioned_rdd.map(lambda x: x[1])
  6. .mapPartitions(estimate)
  7. .toDF()
  8. )


使用groupByKeys.flatMapValues(estimate)的方法在并行化中也不干净。

kq4fsx7k

kq4fsx7k1#

我会将数组按key分组,然后使用applyInPandas计算每组的估计值。这种方法将并行分配对应于每个唯一键的组,以提高计算效率。

  1. def estimate(pdf):
  2. acc, result = 0, []
  3. pdf = pdf.sort_values('time')
  4. for r in pdf.itertuples():
  5. acc += r.available - r.reduction
  6. acc = min(acc, r.total)
  7. result.append(acc)
  8. return pdf.assign(estimated=result)
  9. schema = T.StructType([*df_input.schema.fields, T.StructField('estimated', T.DoubleType())])
  10. df_result = df_input.groupBy('key').applyInPandas(estimate, schema=schema)

字符串

相关问题