python 使用map函数代替Pyspark Dataframe 的循环

eh57zj3b  于 2023-05-27  发布在  Python
关注(0)|答案(2)|浏览(177)

我必须循环通过一个 Dataframe ,并根据一系列值过滤结果,并根据此计算一个值:例如

# df:
# +------------+----------+------------+
# |item        |      date|sales       |
# +------------+----------+------------+
# |         325|2021-05-01|     8524.64|
# |         400|2021-05-01|     9939.59|
# |         314|2021-05-03|      5466.3|
# |         267|2021-05-04|     6471.63|
# |         387|2021-05-04|     5406.85|
# +------------+----------+------------+

list_items= [325,400,314,267,387] #all values of item column

best_stdev=9999999

for i in list:
    df_filtered = df.filter(col("item")==i)
    stddev_sales = df_filtered.select(stddev("sales")).collect()[0][0]
    if stddev_sales< best_stdev:
       best_stdev = stddev_sales
       besti=i # at the end of the loop, I will have the item with the min standard deviation

基本上,这个代码相当于对每个项目取最小标准差。我知道我可以做一个groupby并计算所有的,它更快,但我确实需要为这种情况做迭代。在这种情况下,我如何使用pySpark map函数?我已经阅读了大量的文档,但没有什么我可以实际应用在这种情况下。如何在pyspark map函数中重写此内容?

2nc8po8w

2nc8po8w1#

AFAIU,你需要一个自定义Map函数来显示每件商品的销售额,这样你就可以在上面应用一些自定义逻辑,如果这是正确的,我想你可以这样做:

import org.apache.spark.sql.functions._

def apply_analysis(x):
  """Process the list of sales and return the result"""
  return output

analysis_udf = F.udf(apply_analysis, IntegerType()) # adjust the return type of your UDF

df.groupBy(col('item')) \
  .agg(analysis_udf(F.collect_list('sales')).alias('analysis')) \
  # ... other aggregations can be done here. e.g. calculate the max out of all values
  .show()
iswrvxsc

iswrvxsc2#

如果你需要一个与包含商品及其日销售额的 Dataframe 标准偏差最小的商品,你可以做一个group by并计算stddev
下面是一个在样本数据上测试示例

blah = data_sdf. \
    groupBy('item'). \
    agg(func.stddev('sales').alias('stddev_sales')). \
    withColumn('min_sdev', func.min('stddev_sales').over(wd.partitionBy())). \
    filter(func.col('min_sdev') == func.col('stddev_sales')). \
    collect()

best_item = blah[0].item
least_sdev_sales = blah[0].stddev_sales

相关问题