我的应用程序由几个并发执行spark查询的工作线程组成。每个查询都有一组最适合它的spark sql配置参数(例如: spark.sql.shuffle.partitions
),但由于所有查询都是并发执行的,因此我无法分别控制每个查询的配置值。
此示例演示了具有两个并发线程的应用程序,每个线程运行相同的spark查询,但每个线程的值不同 spark.sql.shuffle.partitions
:
import time
from pyspark.sql import SparkSession
from concurrent.futures.thread import ThreadPoolExecutor
spark = SparkSession.builder.getOrCreate()
default_num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
print(f'default spark.sql.shuffle.partitions={default_num_partitions}')
def run_loop_in_thread(new_num_partitions: int):
for i in range(10):
spark.conf.set('spark.sql.shuffle.partitions', str(new_num_partitions))
time.sleep(0.5) # just to simulate something longer
actual_num_partitions = spark.range(100).repartition("id").rdd.getNumPartitions()
print(f'expected={new_num_partitions} ; actual={actual_num_partitions}')
time.sleep(0.5) # just to simulate something longer
spark.conf.set('spark.sql.shuffle.partitions', str(default_num_partitions))
pool = ThreadPoolExecutor()
pool.submit(run_loop_in_thread, 3)
pool.submit(run_loop_in_thread, 5)
pool.shutdown(wait=True)
这个例子的输出是这样的(由于这个代码的不确定性,您可能会看到一些稍微不同的东西):
default spark.sql.shuffle.partitions=200
expected=5 ; actual=5
expected=3 ; actual=5
expected=3 ; actual=3
expected=5 ; actual=3
...
看来 spark.conf
示例在所有线程之间共享。有没有办法让它成为本地线程?。我的目标是确保在上面的例子中 expected
永远都是一样的 actual
,就像这样:
default spark.sql.shuffle.partitions=200
expected=5 ; actual=5
expected=3 ; actual=3
expected=3 ; actual=3
expected=5 ; actual=5
...
暂无答案!
目前还没有任何答案,快来回答吧!