python—在dataframe上并行化pyspark精化的最佳方法?

aij0ehis  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(462)

我是spark的新手,正在尝试学习如何并行执行以下代码:


# init of results dataframe, empty

schema = StructType([
    StructField('user', StringType()),
    StructField('mean', FloatType()),
    StructField('std_dev', FloatType())
])
df_result = spark.createDataFrame([], schema)

# for each distinct user:

# - build table of his transactions

# - recon mean and std of 'importo'

# - append new row to df_result about this user

for usr in users:
    # get usr transactions by union of the rows regarding this user
    bons = df_bon[df_bon['user']==usr]
    rics = df_ric[df_ric['user']==usr]
    tot = bons.union(rics)
    # LEFT ANTI JOIN to delete all the fraudolent transactions
    no_frauds = tot.join(df_frodi, ['transaction_id'], "leftanti")
    # recon mean and standard deviation, it is a 1-row dataframe
    no_frauds = no_frauds.select(_mean(col('importo')).alias('mean'), _stddev(col('importo')).alias('std')).collect()
    # create dataframe with user field, then append it to the result dataframe via union
    df_result = df_result.union(spark.createDataFrame([(usr, no_frauds[0][0], no_frauds[0][1])], ['user', 'mean', 'std']))

我想了解如何增强这段代码,欢迎大家的建议,如果我写了不好的代码请原谅,但这是我第一次接近spark:)
我知道问题是for循环,它没有实现任何并行逻辑,但是我想知道什么是并行化的最佳实践(例如,可以使用标准吗 ThreadPool 方法?)

n7taea2i

n7taea2i1#

我的答案将是部分的,因为你没有提供任何数据,以充分了解你的情况。
但有点像:

from pyspark.sql import functions as F

df_result = (
    df_bon.union(df_rics)
    .where(F.col("user").isin(users))
    .join(df_frodi, ["transaction_id"], "leftanti")
    .groupBy("user")
    .agg(F.avg("importo").alias("mean"), F.stddev("importo").alias("std"))
)

相关问题