在下面的代码中,在pyspark 2.x
中,使用executorMetricsUpdate.execId()
总是给出一个数字形式的执行器ID,例如'31'
(虽然是字符串类型)。但是从pyspark 2.x
升级到3.4.1
之后,executor_id
总是'driver'
。为什么?我依赖于这个executor_id来创建一个关于内存使用情况的统计报告。有没有一种方法可以访问该指标所针对的executor?
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
executor_id = executorMetricsUpdate.execId()
字符串
更新:
在我的配置中,我有:
"spark.master": "yarn",
型
此外,如果我修改我的spark作业以返回它正在运行的机器,我可以看到集群中的多台机器已经被使用。
如果我点击Spark API端点
...stages/<stage_id>/<attempt_id>/taskList
型
我可以在不同的主机上看到多个任务,但所有的内存指标(例如peakExecutionMemory)都设置为0:
"taskId" : 0,
"index" : 0,
"attempt" : 0,
"partitionId" : 0,
"launchTime" : "2023-11-30T13:48:28.018GMT",
"duration" : 11850,
"executorId" : "127",
"host" : "xxxxxxx",
"status" : "SUCCESS",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 750,
"executorDeserializeCpuTime" : 666813421,
"executorRunTime" : 10990,
"executorCpuTime" : 129812934,
"resultSize" : 1366,
"jvmGcTime" : 0,
"resultSerializationTime" : 4,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputMetrics" : {
"bytesRead" : 0,
"recordsRead" : 0
},
型
如果我点击/executors
端点,我只看到“驱动程序”
更新2:
我在一台用户机器上使用pyspark运行了我的spark应用程序(与spark worker机器不同):
# this utility function which sets a few conf_dict variables
def custom_create_context(..):
conf = pyspark.SparkConf().setAll(conf_dict.items())
ss = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()
sc = ss.sparkContext
return sc, ss
# called by:
sc, ss = custom_create_context(conf_dict={
"spark.app.name": "ftest1",
"spark.driver.memory": "2g",
"spark.executor.memory": "2g",
"spark.executor.cores": "1",
"spark.task.cpus": "1",
"spark.dynamicAllocation.maxExecutors": "1000",
"spark.master": "yarn",
}, show_report=True)
# register listener
listener = MemoryReporter(sc, report_dir)
sc._jsc.sc().addSparkListener(listener)
# submit work to spark
x = [some data]
result = sc.parallelize(x, numSlices=len(x)).map(lambda x: myfunc(x)).collect()
print(result)
型
conf_dict变量:
spark.app.name: xxx
spark.driver.memory: 2g
spark.executor.memory: 2g
spark.executor.cores: 1
spark.task.cpus: 1
spark.dynamicAllocation.maxExecutors: 1000
spark.yarn.appMasterEnv.SPARK_WITH_NFS: true
spark.executorEnv.SPARK_WITH_NFS: true
spark.driverEnv.SPARK_WITH_NFS: true
spark.master: yarn
spark.submit.deployMode: client
spark.dynamicAllocation.enabled: true
spark.shuffle.service.enabled: true
spark.dynamicAllocation.executorIdleTimeout: 120s
spark.dynamicAllocation.cachedExecutorIdleTimeout: 300s
spark.driver.cores: 4
spark.driver.maxResultSize: 32g
spark.yarn.am.waitTime: 30s
spark.ui.showConsoleProgress: true
spark.worker.ui.retainedExecutors: 5000
spark.ui.retainedDeadExecutors: 5000
spark.eventLog.enabled: true
spark.eventLog.dir: hdfs:///spark-history
spark.starvation.timeout: 90s
spark.local.dir: xxx
spark.python.worker.memory: 1024m
spark.pyspark.python: xxx
spark.yarn.appMasterEnv.PYSPARK_PYTHON: xxx
spark.executorEnv.PYSPARK_PYTHON: xx
spark.driverEnv.PYSPARK_PYTHON: xxx
spark.yarn.appMasterEnv.VAULT_BUE_CACHE: xxx
spark.executorEnv.VAULT_BUE_CACHE: xxx
spark.driverEnv.VAULT_BUE_CACHE: xxx
spark.redaction.regex: xxx
spark.yarn.appMasterEnv.SHELL: /bin/bash
spark.executorEnv.SHELL: /bin/bash
spark.driverEnv.SHELL: /bin/bash
spark.yarn.appMasterEnv.RCFILE: xxx
spark.executorEnv.RCFILE: xxx
spark.driverEnv.RCFILE: xxx
spark.yarn.appMasterEnv.TERM_PROGRAM_VERSION: 3.2a
spark.executorEnv.TERM_PROGRAM_VERSION: 3.2a
spark.driverEnv.TERM_PROGRAM_VERSION: 3.2a
spark.yarn.appMasterEnv.BSTINPUTS: .:/data/app/latex/bst/:/usr/share/texmf/bibtex/bst//:
spark.executorEnv.BSTINPUTS: .:/data/app/latex/bst/:/usr/share/texmf/bibtex/bst//:
spark.driverEnv.BSTINPUTS: .:/data/app/latex/bst/:/usr/share/texmf/bibtex/bst//:
spark.yarn.appMasterEnv.LOGNAME: xxx
spark.executorEnv.LOGNAME: xxx
spark.driverEnv.LOGNAME: xxx
spark.yarn.appMasterEnv.XDG_SESSION_TYPE: tty
spark.executorEnv.XDG_SESSION_TYPE: tty
spark.driverEnv.XDG_SESSION_TYPE: tty
spark.yarn.appMasterEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
spark.executorEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
spark.driverEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
spark.yarn.appMasterEnv.VIRTUAL_ENV: xxx
spark.executorEnv.VIRTUAL_ENV: xxx
spark.driverEnv.VIRTUAL_ENV: xxx
spark.yarn.appMasterEnv.VAULT_TMPFS: xxx
spark.executorEnv.VAULT_TMPFS: xxx
spark.driverEnv.VAULT_TMPFS: xxx
spark.yarn.appMasterEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
spark.executorEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
spark.driverEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
spark.yarn.appMasterEnv.ORACLE_HOME: /app/oracle
spark.executorEnv.ORACLE_HOME: /app/oracle
spark.driverEnv.ORACLE_HOME: /app/oracle
spark.yarn.appMasterEnv.SSH_CONNECTION: xxx
spark.executorEnv.SSH_CONNECTION: xxx
spark.driverEnv.SSH_CONNECTION: xxx
spark.yarn.appMasterEnv.HOMEBREW
spark.driverEnv.GIT_BASE: xxx
spark.yarn.appMasterEnv.TERMINFO: /usr/share/terminfo
spark.executorEnv.TERMINFO: /usr/share/terminfo
spark.driverEnv.TERMINFO: /usr/share/terminfo
spark.yarn.appMasterEnv.TERM: screen
spark.executorEnv.TERM: screen
spark.driverEnv.TERM: screen
spark.yarn.appMasterEnv.USER: xxx
spark.executorEnv.USER: xxx
spark.driverEnv.USER: xxx
spark.yarn.appMasterEnv.LC_TERMINAL_VERSION: 3.4.20
spark.executorEnv.LC_TERMINAL_VERSION: 3.4.20
spark.driverEnv.LC_TERMINAL_VERSION: 3.4.20
spark.yarn.appMasterEnv.CONSUL_HTTP_SSL: true
spark.executorEnv.CONSUL_HTTP_SSL: true
spark.driverEnv.CONSUL_HTTP_SSL: true
spark.yarn.appMasterEnv.SHLVL: 2
spark.executorEnv.SHLVL: 2
spark.driverEnv.SHLVL: 2
spark.yarn.appMasterEnv.XDG_SESSION_ID: 3
spark.executorEnv.XDG_SESSION_ID: 3
spark.driverEnv.XDG_SESSION_ID: 3
spark.yarn.appMasterEnv.PROFILE_DEBUG: no
spark.executorEnv.PROFILE_DEBUG: no
spark.driverEnv.PROFILE_DEBUG: no
spark.yarn.appMasterEnv.XDG_RUNTIME_DIR: xxx
spark.executorEnv.XDG_RUNTIME_DIR: xx
spark.driverEnv.XDG_RUNTIME_DIR: xxx
spark.yarn.appMasterEnv.PROCPS_USERLEN: 16
spark.executorEnv.PROCPS_USERLEN: 16
spark.driverEnv.PROCPS_USERLEN: 16
spark.yarn.appMasterEnv.SSH_CLIENT: xxx
spark.executorEnv.SSH_CLIENT: xxx
spark.driverEnv.SSH_CLIENT: xxx
spark.yarn.appMasterEnv.REQUESTS_CA_BUNDLE: xx
spark.executorEnv.REQUESTS_CA_BUNDLE: xxx
spark.driverEnv.REQUESTS_CA_BUNDLE: xxx
spark.yarn.appMasterEnv.DOCKER_HOST: unix:///xx
spark.executorEnv.DOCKER_HOST: unix:///xxx
spark.driverEnv.DOCKER_HOST: unix:///xxx
spark.yarn.appMasterEnv.TNS_ADMIN: xxx
spark.executorEnv.TNS_ADMIN: xxx
spark.driverEnv.TNS_ADMIN: xxx
spark.yarn.appMasterEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
spark.executorEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
spark.driverEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
spark.yarn.appMasterEnv.PATH: xxx
spark.executorEnv.PATH: xxx
spark.driverEnv.PATH: xxx
spark.yarn.appMasterEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=/xxx
spark.executorEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=/xxx
spark.driverEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=/xxx
spark.yarn.appMasterEnv.SSH_TTY: /dev/pts/0
spark.executorEnv.SSH_TTY: /dev/pts/0
spark.driverEnv.SSH_TTY: /dev/pts/0
spark.yarn.appMasterEnv.OLDPWD: /xx
spark.executorEnv.OLDPWD: x
spark.driverEnv.OLDPWD: xx
spark.yarn.appMasterEnv.CONSUL_HTTP_ADDR: xxx
spark.executorEnv.CONSUL_HTTP_ADDR: xxx
spark.driverEnv.CONSUL_HTTP_ADDR: xxx
spark.yarn.appMasterEnv.SSL_CERT_FILE: xxx
spark.executorEnv.SSL_CERT_FILE: xxx
spark.driverEnv.SSL_CERT_FILE: xx
spark.yarn.appMasterEnv.CONSUL_TOKEN_JSON: xxx
spark.executorEnv.CONSUL_TOKEN_JSON: xxx
spark.driverEnv.CONSUL_TOKEN_JSON: xx
spark.yarn.appMasterEnv.LOGNAME: x
spark.executorEnv.LOGNAME: x
spark.driverEnv.LOGNAME: x
spark.yarn.appMasterEnv.XDG_SESSION_TYPE: tty
spark.executorEnv.XDG_SESSION_TYPE: tty
spark.driverEnv.XDG_SESSION_TYPE: tty
spark.yarn.appMasterEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
spark.executorEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
spark.driverEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
spark.yarn.appMasterEnv.CONSUL_LEASE_FILE: xxx
spark.executorEnv.CONSUL_LEASE_FILE: xxx
spark.driverEnv.CONSUL_LEASE_FILE: xxx
spark.yarn.appMasterEnv.CONSUL_HTTP_TOKEN_ACCESSOR: xxx
spark.executorEnv.CONSUL_HTTP_TOKEN_ACCESSOR: xxx
spark.driverEnv.CONSUL_HTTP_TOKEN_ACCESSOR: xxx
spark.yarn.appMasterEnv.CONSUL_HTTP_TOKEN: xxx
spark.executorEnv.CONSUL_HTTP_TOKEN: xxx
spark.driverEnv.CONSUL_HTTP_TOKEN: xxx
spark.yarn.appMasterEnv.VIRTUAL_ENV: xxx
spark.executorEnv.VIRTUAL_ENV: xxx
spark.driverEnv.VIRTUAL_ENV: xxx
spark.yarn.appMasterEnv.VAULT_TMPFS: xxx
spark.executorEnv.VAULT_TMPFS: xxx
spark.driverEnv.VAULT_TMPFS: xxx
spark.yarn.appMasterEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
spark.executorEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
spark.driverEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
spark.yarn.appMasterEnv.ORACLE_HOME: /app/oracle
spark.executorEnv.ORACLE_HOME: /app/oracle
spark.driverEnv.ORACLE_HOME: /app/oracle
spark.yarn.appMasterEnv.SSH_CONNECTION: xxx
spark.executorEnv.SSH_CONNECTION: xxx
spark.driverEnv.SSH_CONNECTION: xxx
spark.yarn.appMasterEnv.HOMEBREW_NO_AUTO_UPDATE: 1
spark.executorEnv.HOMEBREW_NO_AUTO_UPDATE: 1
spark.driverEnv.HOMEBREW_NO_AUTO_UPDATE: 1
spark.yarn.appMasterEnv.CONSUL_HTTP_TOKEN_FILE: xxx
spark.executorEnv.CONSUL_HTTP_TOKEN_FILE: xxx
spark.driverEnv.CONSUL_HTTP_TOKEN_FILE: xxx
spark.executorEnv.CONSUL_HTTP_TOKEN_FILE: xxx
spark.driverEnv.CONSUL_HTTP_TOKEN_FILE: xxx
spark.yarn.appMasterEnv.XDG_SESSION_CLASS: user
spark.executorEnv.XDG_SESSION_CLASS: user
spark.driverEnv.XDG_SESSION_CLASS: user
spark.yarn.appMasterEnv.GIT_BASE: xxx
spark.executorEnv.GIT_BASE: xxx
spark.driverEnv.GIT_BASE: xxx
spark.yarn.appMasterEnv.TERMINFO: /usr/share/terminfo
spark.executorEnv.TERMINFO: /usr/share/terminfo
spark.driverEnv.TERMINFO: /usr/share/terminfo
spark.yarn.appMasterEnv.TERM: screen
spark.executorEnv.TERM: screen
spark.driverEnv.TERM: screen
spark.yarn.appMasterEnv.USER: xxx
spark.executorEnv.USER: xxx
spark.driverEnv.USER: xxx
spark.yarn.appMasterEnv.CONSUL_HTTP_SSL: true
spark.executorEnv.CONSUL_HTTP_SSL: true
spark.driverEnv.CONSUL_HTTP_SSL: true
spark.yarn.appMasterEnv.SHLVL: 2
spark.executorEnv.SHLVL: 2
spark.driverEnv.SHLVL: 2
spark.yarn.appMasterEnv.GIT_EXAMPLE: xxx
spark.executorEnv.GIT_EXAMPLE: xxx
spark.driverEnv.GIT_EXAMPLE: xxx
spark.yarn.appMasterEnv.XDG_SESSION_ID: 3
spark.executorEnv.XDG_SESSION_ID: 3
spark.driverEnv.XDG_SESSION_ID: 3
spark.yarn.appMasterEnv.PROFILE_DEBUG: no
spark.executorEnv.PROFILE_DEBUG: no
spark.driverEnv.PROFILE_DEBUG: no
spark.yarn.appMasterEnv.XDG_RUNTIME_DIR: xxx
spark.executorEnv.XDG_RUNTIME_DIR: xxx
spark.driverEnv.XDG_RUNTIME_DIR: xxx
spark.yarn.appMasterEnv.PROCPS_USERLEN: 16
spark.executorEnv.PROCPS_USERLEN: 16
spark.driverEnv.PROCPS_USERLEN: 16
spark.yarn.appMasterEnv.PYTHONBREAKPOINT: pudb.set_trace
spark.executorEnv.PYTHONBREAKPOINT: pudb.set_trace
spark.driverEnv.PYTHONBREAKPOINT: pudb.set_trace
spark.yarn.appMasterEnv.SSH_CLIENT: xxx
spark.executorEnv.SSH_CLIENT: xx
spark.driverEnv.SSH_CLIENT: xxx
spark.yarn.appMasterEnv.REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
spark.executorEnv.REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
spark.driverEnv.REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
spark.yarn.appMasterEnv.DOCKER_HOST: unix:///xxx
spark.executorEnv.DOCKER_HOST: unix:///xxx
spark.driverEnv.DOCKER_HOST: unix:///xx
spark.yarn.appMasterEnv.TNS_ADMIN: xxx
spark.executorEnv.TNS_ADMIN: xxx
spark.driverEnv.TNS_ADMIN: xxx
spark.yarn.appMasterEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
spark.executorEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
spark.driverEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
spark.yarn.appMasterEnv.PATH: xxx
spark.executorEnv.PATH: xxx
spark.driverEnv.PATH: xxx
spark.yarn.appMasterEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=xxx
spark.executorEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=xx
spark.driverEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=xxx
spark.yarn.appMasterEnv.SSH_TTY: /dev/pts/0
spark.executorEnv.SSH_TTY: /dev/pts/0
spark.driverEnv.SSH_TTY: /dev/pts/0
spark.yarn.appMasterEnv.OLDPWD: xxx
spark.executorEnv.OLDPWD: xxx
spark.driverEnv.OLDPWD: xxx
spark.yarn.appMasterEnv.CONSUL_HTTP_ADDR: xxx
spark.executorEnv.CONSUL_HTTP_ADDR: xx
spark.driverEnv.CONSUL_HTTP_ADDR: xxx
spark.yarn.appMasterEnv.SSL_CERT_FILE: /etc/ssl/certs/ca-certificates.crt
spark.executorEnv.SSL_CERT_FILE: /etc/ssl/certs/ca-certificates.crt
spark.driverEnv.SSL_CERT_FILE: /etc/ssl/certs/ca-certificates.crt
spark.yarn.appMasterEnv.CURL_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
spark.executorEnv.CURL_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
spark.driverEnv.CURL_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
spark.yarn.appMasterEnv.PYSPARK_RDD_CODE_SERIALIZER: pyspark.serializers.CloudPickleSerializer
spark.executorEnv.PYSPARK_RDD_CODE_SERIALIZER: pyspark.serializers.CloudPickleSerializer
spark.driverEnv.PYSPARK_RDD_CODE_SERIALIZER: pyspark.serializers.CloudPickleSerializer
spark.yarn.appMasterEnv.PYSPARK_RDD_DATA_SERIALIZER: pyspark.serializers.CloudPickleSerializer
spark.executorEnv.PYSPARK_RDD_DATA_SERIALIZER: pyspark.serializers.CloudPickleSerializer
spark.driverEnv.PYSPARK_RDD_DATA_SERIALIZER: pyspark.serializers.CloudPickleSerializer
spark.yarn.appMasterEnv.HADOOP_HOME: /.../hadoop/current
spark.executorEnv.HADOOP_HOME: xxx
spark.driverEnv.HADOOP_HOME: xxx
spark.yarn.appMasterEnv.HADOOP_COMMON_HOME: xxx
spark.executorEnv.HADOOP_COMMON_HOME: xxx
spark.driverEnv.HADOOP_COMMON_HOME: xxx
spark.yarn.appMasterEnv.HADOOP_HDFS_HOME: xxx
spark.executorEnv.HADOOP_HDFS_HOME: xxx
spark.driverEnv.HADOOP_HDFS_HOME: xxx
spark.yarn.appMasterEnv.HADOOP_YARN_HOME: xxx
spark.executorEnv.HADOOP_YARN_HOME: xxx
spark.driverEnv.HADOOP_YARN_HOME: xxx
spark.yarn.appMasterEnv.HADOOP_MAPRED_HOME: xxx
spark.executorEnv.HADOOP_MAPRED_HOME: xxx
spark.driverEnv.HADOOP_MAPRED_HOME: xxx
spark.yarn.appMasterEnv.HADOOP_CONF_DIR: xxx
spark.executorEnv.HADOOP_CONF_DIR: xxx
spark.driverEnv.HADOOP_CONF_DIR: xxx
spark.yarn.appMasterEnv.OMP_NUM_THREADS: 1
spark.executorEnv.OMP_NUM_THREADS: 1
spark.driverEnv.OMP_NUM_THREADS: 1
spark.yarn.appMasterEnv.SPARK_HOME: xxx
spark.executorEnv.SPARK_HOME: xxx
spark.driverEnv.SPARK_HOME: xxx
型
更新3:
MemoryReporter是一个自定义类,它接收https://spark.apache.org/docs/2.3.3/api/scala/index.html#org.apache.spark.scheduler.SparkListener中描述的事件:
API_URL_VAR = "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES"
pyspark_v3 = pyspark.__version__.startswith("3.")
class SparkListener(object):
"""
https://spark.apache.org/docs/2.3.3/api/scala/index.html#org.apache.spark.scheduler.SparkListener
"""
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerAdded(self, blockManagerAdded):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorBlacklisted(self, executorBlacklisted):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onExecutorUnblacklisted(self, executorUnblacklisted):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onNodeBlacklisted(self, nodeBlacklisted):
pass
def onNodeUnblacklisted(self, nodeUnblacklisted):
pass
def onOtherEvent(self, event):
pass
def onSpeculativeTaskSubmitted(self, speculativeTask):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
def _scala_iterable_to_py_list(scala_iterable):
if isinstance(scala_iterable, Iterable):
return list(scala_iterable)
py_list = []
it = scala_iterable.toIterator()
while it.hasNext():
py_list.append(it.next())
return py_list
def _scala_set_to_py_set(scala_set):
return set(_scala_iterable_to_py_list(scala_set))
class MemoryReporter(SparkListener):
def __init__(self, sc, report_dir):
self.logger = logging.getLogger(self.__class__.__name__)
self._sc = sc
self._report_dir = os.environ.get(SPARK_MEMORY_REPORT_DIR_ENVIRON, report_dir)
self._spark_conf = sc.getConf()
self._username = self._sc._jvm.java.lang.System.getProperty("user.name")
self._application_id = self._spark_conf.get("spark.app.id", None)
spark_executor_memory = self._spark_conf.get("spark.executor.memory", None)
self._requested_limit_byte = self._parse_memory_from_conf(spark_executor_memory)
self._actual_limit_byte = 0 # will be updated once job is running
self._executor_task = {} # type: Dict[int, int]
self._task_memory = {} # type: Dict[int, int]
self._stage_tasks = {} # type: Dict[int, Set[int]]
self._stage_last_attempt_id = {}
self._active_jobs = 0
def _parse_memory_from_conf(self, memory):
pass # edited out for briefness
def _reset_memory_stats(self):
pass # edited out for briefness
def _format_memory_usage(self, x):
pass # edited out for briefness
def _print_report(self, task_memory):
pass # edited out for briefness
def update_stage_info_v2(self):
status_store = self._sc._jsc.sc().statusStore()
all_jobs = _scala_iterable_to_py_list(status_store.jobsList(None))
all_stage_ids = []
for job in all_jobs:
all_stage_ids += _scala_iterable_to_py_list(job.stageIds())
for stage_id in all_stage_ids:
if stage_id in self._stage_tasks:
continue
stage_data = _scala_iterable_to_py_list(status_store.stageData(stage_id, True))
if not stage_data:
continue
last_stage_data = stage_data[-1]
stage_status = last_stage_data.status().toString()
if stage_status != "ACTIVE" and stage_status != "PENDING" and last_stage_data.tasks().isDefined():
stage_tasks = stage_data[-1].tasks().get()
stage_task_ids = _scala_set_to_py_set(stage_tasks.keys())
self._stage_tasks[stage_id] = stage_task_ids
def update_stage_info_v3(self):
"""
In pyspark 3 we don't have:
- stage_info.status
- status_store.stageData(stage_id, True)
- a way to get task info from the stage (only via REST API)
"""
status_store = self._sc._jsc.sc().statusStore()
all_jobs = _scala_iterable_to_py_list(status_store.jobsList(None))
all_stage_ids = []
for job in all_jobs:
all_stage_ids += _scala_iterable_to_py_list(job.stageIds())
for stage_id in all_stage_ids:
if stage_id in self._stage_tasks:
continue
app_id = self._sc.applicationId
proxy_host = self._sc.getConf().get(API_URL_VAR)
if not proxy_host:
continue
stages_api_url = f"{proxy_host}/api/v1/applications/{app_id}/stages/{stage_id}"
try:
response = requests.get(stages_api_url)
stages_data = response.json()
except Exception as ex:
self.logger.warning(f"Failed getting stages_data from API Url: {stages_api_url} ex: {ex}")
continue
last_stage_attempt = stages_data[-1]
last_stage_status = last_stage_attempt.get("status")
if last_stage_status and last_stage_status != "ACTIVE" and last_stage_status != "PENDING":
attempt_id = last_stage_attempt.get("attemptId")
self._stage_last_attempt_id[stage_id] = attempt_id
if attempt_id is not None: # careful as it can be 0 (evaluates to False)
tasks_info_api_url = (
f"{proxy_host}/api/v1/applications/{app_id}/stages/{stage_id}/{attempt_id}/taskList"
)
try:
response2 = requests.get(tasks_info_api_url)
tasks_info_data = response2.json()
self.logger.debug(f"tasks_info_api_url: {tasks_info_api_url}")
task_ids = [t["taskId"] for t in tasks_info_data]
self._stage_tasks[stage_id] = task_ids
except Exception as ex:
self.logger.warning(f"Failed getting tasks_info_data API url: {tasks_info_api_url} ex: {ex}")
continue
def _update_stage_tasks(self):
if pyspark_v3:
self.update_stage_info_v3()
else:
self.update_stage_info_v2()
def _save_csv(self, job_id, df):
pass # edited out for briefness
def _save_report(self, job_id, task_memory):
pass # edited out for briefness
def onTaskStart(self, taskStart):
task_id = taskStart.taskInfo().taskId()
executor_id = taskStart.taskInfo().executorId()
self.logger.debug("setting executor '{}' to task '{}'".format(executor_id, task_id))
self._executor_task[executor_id] = task_id
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
executor_id = executorMetricsUpdate.execId() # returns 'driver' in pyspark > 3 but a number in pyspark < 2
if pyspark_v3:
# https://spark.apache.org/docs/3.1.1/monitoring.html
# https://github.com/apache/spark/blob/v3.1.1/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala#L196
memory_byte = 0
metrics = executorMetricsUpdate.executorUpdates().values().toList() # Scala view to values
metrics = [metrics.apply(i) for i in range(metrics.size())]
for m in metrics:
memory_byte = max(memory_byte, m.getMetricValue("JVMHeapMemory"))
else:
# Backward compatible. e.g. v2.3.4
memory_byte = executorMetricsUpdate.cgroupMetrics().memoryUsageInBytes()
self._actual_limit_byte = max(
self._actual_limit_byte, executorMetricsUpdate.cgroupMetrics().memoryLimitInBytes()
)
task_id = self._executor_task.get(executor_id, None)
if task_id is None:
# would still receive metrics updates after job finishes
self.logger.debug("Cannot find corresponding task for executor {}".format(executor_id))
return
self._task_memory[task_id] = max(self._task_memory.get(task_id, 0), memory_byte)
def getExecutorMetricsFromAPI(self):
"""
Get Spark metrics from the Spark API
Potentially switch this to use this end point:
application_1697883854732_43137/api/v1/applications/application_1697883854732_43137/executors
Or another option is to record this info in update_stage_info_v3() directly
"""
self.logger.info("Getting metrics from Spark API")
app_id = self._sc.applicationId
proxy_host = self._sc.getConf().get(API_URL_VAR)
if not proxy_host:
return
for stage_id, attempt_id in self._stage_last_attempt_id.items():
tasks_info_api_url = f"{proxy_host}/api/v1/applications/{app_id}/stages/{stage_id}/{attempt_id}/taskList"
try:
resp = requests.get(tasks_info_api_url)
tasks_info_data = resp.json()
for task_info in tasks_info_data:
task_id = task_info.get("taskId")
if task_id is not None: # warning, this can be 0 (evaluates to False)
peak_mem = task_info["taskMetrics"]["peakExecutionMemory"]
task_max_mem = max(self._task_memory.get(task_id, 0), peak_mem)
self._task_memory[task_info["taskId"]] = task_max_mem
self.logger.debug(
f"metrics_from_api: stage: {stage_id} task: {task_id} task_max_mem: {task_max_mem}"
)
except Exception as ex:
# Even if one fails, we try the others to get some data for the report
self.logger.warning(f"metrics_from_api: failed getting tasks_info url: {tasks_info_api_url} ex: {ex}")
def onJobStart(self, jobStart):
self._active_jobs += 1
def onJobEnd(self, jobEnd):
self._active_jobs -= 1
job_id = jobEnd.jobId()
if self._active_jobs == 0:
self._update_stage_tasks()
self.logger.info(f"stage_task updated: {self._stage_tasks} job: {job_id}")
if self._task_memory:
self.logger.debug(f"Collected task memory: {self._task_memory} for job_id: {job_id}")
self._print_report(self._task_memory)
self._save_report(job_id, self._task_memory)
self.logger.info("More details at {}".format(get_am_ui_url(self._sc, self._spark_conf)))
else:
if pyspark_v3:
# onExecutorMetricsUpdate() isn't working as expected in pyspark 3+, so we use the RESTP API
self.logger.info(f"Attempting report via Spark API. version: {pyspark.__version__}")
self.getExecutorMetricsFromAPI()
self._print_report(self._task_memory)
self._save_report(job_id, self._task_memory)
self.logger.info("More details at {}".format(get_am_ui_url(self._sc, self._spark_conf)))
else:
self.logger.info(f"Not enough data collected for the report of job: {job_id}")
self._reset_memory_stats()
def onApplicationEnd(self, applicationEnd):
self.logger.info(
"Spark application has ended. Access from the history server {}".format(
get_history_server_url(self._sc, self._spark_conf)
)
)
型
2条答案
按热度按时间kt06eoxx1#
问题是我们添加的一个自定义指标没有像之前的spark版本那样可序列化。这导致从worker发送heartbeat时失败。不幸的是,这个问题的唯一提示出现在运行超过120秒的作业时(heartbeat超时)。
7xzttuei2#
我怀疑可能不仅是你的版本改变了,而且你的集群设置也改变了。
executor_id
可以等于driver
。每个SparkContext中的SparkContext.DRIVER_IDENTIFIER
变量等于3.4.1中的driver
。每个节点都使用它来确定正在运行的进程是BlockManagerId
中的执行器还是驱动程序:字符串
根据您在问题中提供的信息,很难说100%确定(例如,我们不知道您在代码中如何定义
executorMetricsUpdate
),但如果您的executor_id
变量始终等于driver
,则可能您正在使用--master local
而不是使用实际的集群运行集群。如果您使用
--master local
或local
的任何变体运行spark应用程序,以便在运行单个JVM进程的单个机器上运行代码,则驱动程序将同时作为驱动程序和执行器执行。因此,请确保您正在实际的集群上运行,然后您可能会获得想要的行为!