如何使用python和joblib并行提交多个spark作业?

ffvjumwh  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(543)

如何使用python的 joblib 图书馆?
我还想在每个作业中执行“save”或“collect”,因此需要在作业之间重用相同的spark上下文。

yrefmtwq

yrefmtwq1#

下面是一个并行运行多个独立spark作业的示例,而不必等待第一个作业完成。
其他方法
不要使用多重处理,因为它不能pickle spark上下文。
不要使用spark自定义项,因为spark作业无法访问spark上下文
一个警告是必须设置spark fair调度。
此解决方案使用线程而不是不同的进程,以便
spark上下文可以在不同线程之间共享
局部变量可以在不同的线程之间共享

这里的代码用来计算一些数字的平均值

from pyspark.sql.functions import udf, col, mean
from pyspark.sql.types import IntegerType, LongType
from joblib import Parallel, delayed
import pandas as pd
import random

lst = list(range(10, 100))

# Define functions operate on a single value from a column

def multiply(a):
  return a * random.randint(10, 100)

def foo(i):
  # This is the key point here, many different spark collect/save/show can be run here
  # This is the function that parallelizing can help to speed up multiple independent jobs
  return spark.createDataFrame(range(0, i), LongType()).select(mean(multiply(col("value"))).alias("value"))

parallel_job_count = 10

# Use "threads" to allow the same spark object to be reused between the jobs.

results = Parallel(n_jobs=parallel_job_count, prefer="threads")(delayed(foo)(i) for i in lst)

# Collect and print the results

mean_of_means = pd.concat([result.toPandas() for result in results]).value.mean()
print(f"Mean of Means: {mean_of_means}")

相关问题