from pyspark.sql.functions import udf, col, mean
from pyspark.sql.types import IntegerType, LongType
from joblib import Parallel, delayed
import pandas as pd
import random
lst = list(range(10, 100))
# Define functions operate on a single value from a column
def multiply(a):
return a * random.randint(10, 100)
def foo(i):
# This is the key point here, many different spark collect/save/show can be run here
# This is the function that parallelizing can help to speed up multiple independent jobs
return spark.createDataFrame(range(0, i), LongType()).select(mean(multiply(col("value"))).alias("value"))
parallel_job_count = 10
# Use "threads" to allow the same spark object to be reused between the jobs.
results = Parallel(n_jobs=parallel_job_count, prefer="threads")(delayed(foo)(i) for i in lst)
# Collect and print the results
mean_of_means = pd.concat([result.toPandas() for result in results]).value.mean()
print(f"Mean of Means: {mean_of_means}")
1条答案
按热度按时间yrefmtwq1#
下面是一个并行运行多个独立spark作业的示例,而不必等待第一个作业完成。
其他方法
不要使用多重处理,因为它不能pickle spark上下文。
不要使用spark自定义项,因为spark作业无法访问spark上下文
一个警告是必须设置spark fair调度。
此解决方案使用线程而不是不同的进程,以便
spark上下文可以在不同线程之间共享
局部变量可以在不同的线程之间共享
这里的代码用来计算一些数字的平均值