是否可以为不同的驱动程序线程分别更改运行时spark配置?

jobtbby3  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(161)

我的应用程序由几个并发执行spark查询的工作线程组成。每个查询都有一组最适合它的spark sql配置参数(例如: spark.sql.shuffle.partitions ),但由于所有查询都是并发执行的,因此我无法分别控制每个查询的配置值。
此示例演示了具有两个并发线程的应用程序,每个线程运行相同的spark查询,但每个线程的值不同 spark.sql.shuffle.partitions :

import time
from pyspark.sql import SparkSession
from concurrent.futures.thread import ThreadPoolExecutor

spark = SparkSession.builder.getOrCreate()
default_num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
print(f'default spark.sql.shuffle.partitions={default_num_partitions}')

def run_loop_in_thread(new_num_partitions: int):
    for i in range(10):
        spark.conf.set('spark.sql.shuffle.partitions', str(new_num_partitions))
        time.sleep(0.5)  # just to simulate something longer
        actual_num_partitions = spark.range(100).repartition("id").rdd.getNumPartitions()
        print(f'expected={new_num_partitions} ; actual={actual_num_partitions}')
        time.sleep(0.5)  # just to simulate something longer
        spark.conf.set('spark.sql.shuffle.partitions', str(default_num_partitions))

pool = ThreadPoolExecutor()
pool.submit(run_loop_in_thread, 3)
pool.submit(run_loop_in_thread, 5)
pool.shutdown(wait=True)

这个例子的输出是这样的(由于这个代码的不确定性,您可能会看到一些稍微不同的东西):

default spark.sql.shuffle.partitions=200
expected=5 ; actual=5
expected=3 ; actual=5
expected=3 ; actual=3
expected=5 ; actual=3
...

看来 spark.conf 示例在所有线程之间共享。有没有办法让它成为本地线程?。我的目标是确保在上面的例子中 expected 永远都是一样的 actual ,就像这样:

default spark.sql.shuffle.partitions=200
expected=5 ; actual=5
expected=3 ; actual=3
expected=3 ; actual=3
expected=5 ; actual=5
...

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题