如何获得PySpark中的工作者(执行者)数量?

4sup72z8  于 2023-01-16  发布在  Spark
关注(0)|答案(3)|浏览(228)

我需要使用这个参数,那么我怎么才能得到工作者的数量呢?就像在Scala中一样,我可以调用sc.getExecutorMemoryStatus来得到可用的工作者数量。但是在PySpark中,似乎没有公开API来得到这个数字。

kupeojn6

kupeojn61#

在scala中,getExecutorStorageStatusgetExecutorMemoryStatus都返回执行器的数量,包括驱动程序。

/** Method that just returns the current active/registered executors
        * excluding the driver.
        * @param sc The spark context to retrieve registered executors.
        * @return a list of executors each in the form of host:port.
        */
       def currentActiveExecutors(sc: SparkContext): Seq[String] = {
         val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
         val driverHost: String = sc.getConf.get("spark.driver.host")
         allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
       }

但在python API中没有实现
@DanielDarabos answer也证实了这一点。
在python中与此等价的...

sc.getConf().get("spark.executor.instances")
  • 编辑(python):*
%python
sc = spark._jsc.sc() 
n_workers =  len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(n_workers)

正如丹尼在评论中提到的,如果你想交叉验证他们,你可以使用下面的声明。

%python

sc = spark._jsc.sc() 

result1 = sc.getExecutorMemoryStatus().keys() # will print all the executors + driver available

result2 = len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(result1, end ='\n')
print(result2)

示例结果:

Set(10.172.249.9:46467)
0
bq3bfh9z

bq3bfh9z2#

你也可以通过Spark REST API获取执行器的数量:https://spark.apache.org/docs/latest/monitoring.html#rest-api
您可以检查/applications/[app-id]/executors,它返回 * 给定应用程序的所有活动执行器的列表 *。
PS:当spark.dynamicAllocation.enabledtrue时,spark.executor.instances可能不等于当前可用的执行器,但此API始终返回正确的值。

zpqajqem

zpqajqem3#

我以这种方式示例化了SparkContext**,但是没有一个解决方案起作用**:

conf = SparkConf().setMaster(MASTER_CONNECTION_URL).setAppName('App name')
sc = SparkContext(conf=conf)

所以我修改了代码,用pyspark.sql.SparkSession示例化SparkContext,一切正常:

# Gets Spark context
conf = SparkConf().setMaster(MASTER_CONNECTION_URL).setAppName('App name')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

# Gets the number of workers
spark = SparkContext.getOrCreate(conf=conf)
sc2 = spark._jsc.sc()
number_of_workers = len([executor.host() for executor in
                sc2.statusTracker().getExecutorInfos()]) - 1  # Subtract 1 to discard the master

相关问题