pyspark spark onExecutorMetricsUpdate无法获取executor_id

dffbzjpn  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(159)

在下面的代码中,在pyspark 2.x中,使用executorMetricsUpdate.execId()总是给出一个数字形式的执行器ID,例如'31'(虽然是字符串类型)。但是从pyspark 2.x升级到3.4.1之后,executor_id总是'driver'。为什么?我依赖于这个executor_id来创建一个关于内存使用情况的统计报告。有没有一种方法可以访问该指标所针对的executor?

  1. def onExecutorMetricsUpdate(self, executorMetricsUpdate):
  2. executor_id = executorMetricsUpdate.execId()

字符串
更新:
在我的配置中,我有:

  1. "spark.master": "yarn",


此外,如果我修改我的spark作业以返回它正在运行的机器,我可以看到集群中的多台机器已经被使用。
如果我点击Spark API端点

  1. ...stages/<stage_id>/<attempt_id>/taskList


我可以在不同的主机上看到多个任务,但所有的内存指标(例如peakExecutionMemory)都设置为0:

  1. "taskId" : 0,
  2. "index" : 0,
  3. "attempt" : 0,
  4. "partitionId" : 0,
  5. "launchTime" : "2023-11-30T13:48:28.018GMT",
  6. "duration" : 11850,
  7. "executorId" : "127",
  8. "host" : "xxxxxxx",
  9. "status" : "SUCCESS",
  10. "taskLocality" : "PROCESS_LOCAL",
  11. "speculative" : false,
  12. "accumulatorUpdates" : [ ],
  13. "taskMetrics" : {
  14. "executorDeserializeTime" : 750,
  15. "executorDeserializeCpuTime" : 666813421,
  16. "executorRunTime" : 10990,
  17. "executorCpuTime" : 129812934,
  18. "resultSize" : 1366,
  19. "jvmGcTime" : 0,
  20. "resultSerializationTime" : 4,
  21. "memoryBytesSpilled" : 0,
  22. "diskBytesSpilled" : 0,
  23. "peakExecutionMemory" : 0,
  24. "inputMetrics" : {
  25. "bytesRead" : 0,
  26. "recordsRead" : 0
  27. },


如果我点击/executors端点,我只看到“驱动程序”
更新2:
我在一台用户机器上使用pyspark运行了我的spark应用程序(与spark worker机器不同):

  1. # this utility function which sets a few conf_dict variables
  2. def custom_create_context(..):
  3. conf = pyspark.SparkConf().setAll(conf_dict.items())
  4. ss = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()
  5. sc = ss.sparkContext
  6. return sc, ss
  7. # called by:
  8. sc, ss = custom_create_context(conf_dict={
  9. "spark.app.name": "ftest1",
  10. "spark.driver.memory": "2g",
  11. "spark.executor.memory": "2g",
  12. "spark.executor.cores": "1",
  13. "spark.task.cpus": "1",
  14. "spark.dynamicAllocation.maxExecutors": "1000",
  15. "spark.master": "yarn",
  16. }, show_report=True)
  17. # register listener
  18. listener = MemoryReporter(sc, report_dir)
  19. sc._jsc.sc().addSparkListener(listener)
  20. # submit work to spark
  21. x = [some data]
  22. result = sc.parallelize(x, numSlices=len(x)).map(lambda x: myfunc(x)).collect()
  23. print(result)


conf_dict变量:

  1. spark.app.name: xxx
  2. spark.driver.memory: 2g
  3. spark.executor.memory: 2g
  4. spark.executor.cores: 1
  5. spark.task.cpus: 1
  6. spark.dynamicAllocation.maxExecutors: 1000
  7. spark.yarn.appMasterEnv.SPARK_WITH_NFS: true
  8. spark.executorEnv.SPARK_WITH_NFS: true
  9. spark.driverEnv.SPARK_WITH_NFS: true
  10. spark.master: yarn
  11. spark.submit.deployMode: client
  12. spark.dynamicAllocation.enabled: true
  13. spark.shuffle.service.enabled: true
  14. spark.dynamicAllocation.executorIdleTimeout: 120s
  15. spark.dynamicAllocation.cachedExecutorIdleTimeout: 300s
  16. spark.driver.cores: 4
  17. spark.driver.maxResultSize: 32g
  18. spark.yarn.am.waitTime: 30s
  19. spark.ui.showConsoleProgress: true
  20. spark.worker.ui.retainedExecutors: 5000
  21. spark.ui.retainedDeadExecutors: 5000
  22. spark.eventLog.enabled: true
  23. spark.eventLog.dir: hdfs:///spark-history
  24. spark.starvation.timeout: 90s
  25. spark.local.dir: xxx
  26. spark.python.worker.memory: 1024m
  27. spark.pyspark.python: xxx
  28. spark.yarn.appMasterEnv.PYSPARK_PYTHON: xxx
  29. spark.executorEnv.PYSPARK_PYTHON: xx
  30. spark.driverEnv.PYSPARK_PYTHON: xxx
  31. spark.yarn.appMasterEnv.VAULT_BUE_CACHE: xxx
  32. spark.executorEnv.VAULT_BUE_CACHE: xxx
  33. spark.driverEnv.VAULT_BUE_CACHE: xxx
  34. spark.redaction.regex: xxx
  35. spark.yarn.appMasterEnv.SHELL: /bin/bash
  36. spark.executorEnv.SHELL: /bin/bash
  37. spark.driverEnv.SHELL: /bin/bash
  38. spark.yarn.appMasterEnv.RCFILE: xxx
  39. spark.executorEnv.RCFILE: xxx
  40. spark.driverEnv.RCFILE: xxx
  41. spark.yarn.appMasterEnv.TERM_PROGRAM_VERSION: 3.2a
  42. spark.executorEnv.TERM_PROGRAM_VERSION: 3.2a
  43. spark.driverEnv.TERM_PROGRAM_VERSION: 3.2a
  44. spark.yarn.appMasterEnv.BSTINPUTS: .:/data/app/latex/bst/:/usr/share/texmf/bibtex/bst//:
  45. spark.executorEnv.BSTINPUTS: .:/data/app/latex/bst/:/usr/share/texmf/bibtex/bst//:
  46. spark.driverEnv.BSTINPUTS: .:/data/app/latex/bst/:/usr/share/texmf/bibtex/bst//:
  47. spark.yarn.appMasterEnv.LOGNAME: xxx
  48. spark.executorEnv.LOGNAME: xxx
  49. spark.driverEnv.LOGNAME: xxx
  50. spark.yarn.appMasterEnv.XDG_SESSION_TYPE: tty
  51. spark.executorEnv.XDG_SESSION_TYPE: tty
  52. spark.driverEnv.XDG_SESSION_TYPE: tty
  53. spark.yarn.appMasterEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
  54. spark.executorEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
  55. spark.driverEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
  56. spark.yarn.appMasterEnv.VIRTUAL_ENV: xxx
  57. spark.executorEnv.VIRTUAL_ENV: xxx
  58. spark.driverEnv.VIRTUAL_ENV: xxx
  59. spark.yarn.appMasterEnv.VAULT_TMPFS: xxx
  60. spark.executorEnv.VAULT_TMPFS: xxx
  61. spark.driverEnv.VAULT_TMPFS: xxx
  62. spark.yarn.appMasterEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
  63. spark.executorEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
  64. spark.driverEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
  65. spark.yarn.appMasterEnv.ORACLE_HOME: /app/oracle
  66. spark.executorEnv.ORACLE_HOME: /app/oracle
  67. spark.driverEnv.ORACLE_HOME: /app/oracle
  68. spark.yarn.appMasterEnv.SSH_CONNECTION: xxx
  69. spark.executorEnv.SSH_CONNECTION: xxx
  70. spark.driverEnv.SSH_CONNECTION: xxx
  71. spark.yarn.appMasterEnv.HOMEBREW
  72. spark.driverEnv.GIT_BASE: xxx
  73. spark.yarn.appMasterEnv.TERMINFO: /usr/share/terminfo
  74. spark.executorEnv.TERMINFO: /usr/share/terminfo
  75. spark.driverEnv.TERMINFO: /usr/share/terminfo
  76. spark.yarn.appMasterEnv.TERM: screen
  77. spark.executorEnv.TERM: screen
  78. spark.driverEnv.TERM: screen
  79. spark.yarn.appMasterEnv.USER: xxx
  80. spark.executorEnv.USER: xxx
  81. spark.driverEnv.USER: xxx
  82. spark.yarn.appMasterEnv.LC_TERMINAL_VERSION: 3.4.20
  83. spark.executorEnv.LC_TERMINAL_VERSION: 3.4.20
  84. spark.driverEnv.LC_TERMINAL_VERSION: 3.4.20
  85. spark.yarn.appMasterEnv.CONSUL_HTTP_SSL: true
  86. spark.executorEnv.CONSUL_HTTP_SSL: true
  87. spark.driverEnv.CONSUL_HTTP_SSL: true
  88. spark.yarn.appMasterEnv.SHLVL: 2
  89. spark.executorEnv.SHLVL: 2
  90. spark.driverEnv.SHLVL: 2
  91. spark.yarn.appMasterEnv.XDG_SESSION_ID: 3
  92. spark.executorEnv.XDG_SESSION_ID: 3
  93. spark.driverEnv.XDG_SESSION_ID: 3
  94. spark.yarn.appMasterEnv.PROFILE_DEBUG: no
  95. spark.executorEnv.PROFILE_DEBUG: no
  96. spark.driverEnv.PROFILE_DEBUG: no
  97. spark.yarn.appMasterEnv.XDG_RUNTIME_DIR: xxx
  98. spark.executorEnv.XDG_RUNTIME_DIR: xx
  99. spark.driverEnv.XDG_RUNTIME_DIR: xxx
  100. spark.yarn.appMasterEnv.PROCPS_USERLEN: 16
  101. spark.executorEnv.PROCPS_USERLEN: 16
  102. spark.driverEnv.PROCPS_USERLEN: 16
  103. spark.yarn.appMasterEnv.SSH_CLIENT: xxx
  104. spark.executorEnv.SSH_CLIENT: xxx
  105. spark.driverEnv.SSH_CLIENT: xxx
  106. spark.yarn.appMasterEnv.REQUESTS_CA_BUNDLE: xx
  107. spark.executorEnv.REQUESTS_CA_BUNDLE: xxx
  108. spark.driverEnv.REQUESTS_CA_BUNDLE: xxx
  109. spark.yarn.appMasterEnv.DOCKER_HOST: unix:///xx
  110. spark.executorEnv.DOCKER_HOST: unix:///xxx
  111. spark.driverEnv.DOCKER_HOST: unix:///xxx
  112. spark.yarn.appMasterEnv.TNS_ADMIN: xxx
  113. spark.executorEnv.TNS_ADMIN: xxx
  114. spark.driverEnv.TNS_ADMIN: xxx
  115. spark.yarn.appMasterEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
  116. spark.executorEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
  117. spark.driverEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
  118. spark.yarn.appMasterEnv.PATH: xxx
  119. spark.executorEnv.PATH: xxx
  120. spark.driverEnv.PATH: xxx
  121. spark.yarn.appMasterEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=/xxx
  122. spark.executorEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=/xxx
  123. spark.driverEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=/xxx
  124. spark.yarn.appMasterEnv.SSH_TTY: /dev/pts/0
  125. spark.executorEnv.SSH_TTY: /dev/pts/0
  126. spark.driverEnv.SSH_TTY: /dev/pts/0
  127. spark.yarn.appMasterEnv.OLDPWD: /xx
  128. spark.executorEnv.OLDPWD: x
  129. spark.driverEnv.OLDPWD: xx
  130. spark.yarn.appMasterEnv.CONSUL_HTTP_ADDR: xxx
  131. spark.executorEnv.CONSUL_HTTP_ADDR: xxx
  132. spark.driverEnv.CONSUL_HTTP_ADDR: xxx
  133. spark.yarn.appMasterEnv.SSL_CERT_FILE: xxx
  134. spark.executorEnv.SSL_CERT_FILE: xxx
  135. spark.driverEnv.SSL_CERT_FILE: xx
  136. spark.yarn.appMasterEnv.CONSUL_TOKEN_JSON: xxx
  137. spark.executorEnv.CONSUL_TOKEN_JSON: xxx
  138. spark.driverEnv.CONSUL_TOKEN_JSON: xx
  139. spark.yarn.appMasterEnv.LOGNAME: x
  140. spark.executorEnv.LOGNAME: x
  141. spark.driverEnv.LOGNAME: x
  142. spark.yarn.appMasterEnv.XDG_SESSION_TYPE: tty
  143. spark.executorEnv.XDG_SESSION_TYPE: tty
  144. spark.driverEnv.XDG_SESSION_TYPE: tty
  145. spark.yarn.appMasterEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
  146. spark.executorEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
  147. spark.driverEnv.TEXINPUTS: .:/data/app/latex/style/:/usr/share/texmf/tex//:
  148. spark.yarn.appMasterEnv.CONSUL_LEASE_FILE: xxx
  149. spark.executorEnv.CONSUL_LEASE_FILE: xxx
  150. spark.driverEnv.CONSUL_LEASE_FILE: xxx
  151. spark.yarn.appMasterEnv.CONSUL_HTTP_TOKEN_ACCESSOR: xxx
  152. spark.executorEnv.CONSUL_HTTP_TOKEN_ACCESSOR: xxx
  153. spark.driverEnv.CONSUL_HTTP_TOKEN_ACCESSOR: xxx
  154. spark.yarn.appMasterEnv.CONSUL_HTTP_TOKEN: xxx
  155. spark.executorEnv.CONSUL_HTTP_TOKEN: xxx
  156. spark.driverEnv.CONSUL_HTTP_TOKEN: xxx
  157. spark.yarn.appMasterEnv.VIRTUAL_ENV: xxx
  158. spark.executorEnv.VIRTUAL_ENV: xxx
  159. spark.driverEnv.VIRTUAL_ENV: xxx
  160. spark.yarn.appMasterEnv.VAULT_TMPFS: xxx
  161. spark.executorEnv.VAULT_TMPFS: xxx
  162. spark.driverEnv.VAULT_TMPFS: xxx
  163. spark.yarn.appMasterEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
  164. spark.executorEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
  165. spark.driverEnv.BIBINPUTS: .:/data/app/latex/:/usr/share/texmf/bibtex/bib//:
  166. spark.yarn.appMasterEnv.ORACLE_HOME: /app/oracle
  167. spark.executorEnv.ORACLE_HOME: /app/oracle
  168. spark.driverEnv.ORACLE_HOME: /app/oracle
  169. spark.yarn.appMasterEnv.SSH_CONNECTION: xxx
  170. spark.executorEnv.SSH_CONNECTION: xxx
  171. spark.driverEnv.SSH_CONNECTION: xxx
  172. spark.yarn.appMasterEnv.HOMEBREW_NO_AUTO_UPDATE: 1
  173. spark.executorEnv.HOMEBREW_NO_AUTO_UPDATE: 1
  174. spark.driverEnv.HOMEBREW_NO_AUTO_UPDATE: 1
  175. spark.yarn.appMasterEnv.CONSUL_HTTP_TOKEN_FILE: xxx
  176. spark.executorEnv.CONSUL_HTTP_TOKEN_FILE: xxx
  177. spark.driverEnv.CONSUL_HTTP_TOKEN_FILE: xxx
  178. spark.executorEnv.CONSUL_HTTP_TOKEN_FILE: xxx
  179. spark.driverEnv.CONSUL_HTTP_TOKEN_FILE: xxx
  180. spark.yarn.appMasterEnv.XDG_SESSION_CLASS: user
  181. spark.executorEnv.XDG_SESSION_CLASS: user
  182. spark.driverEnv.XDG_SESSION_CLASS: user
  183. spark.yarn.appMasterEnv.GIT_BASE: xxx
  184. spark.executorEnv.GIT_BASE: xxx
  185. spark.driverEnv.GIT_BASE: xxx
  186. spark.yarn.appMasterEnv.TERMINFO: /usr/share/terminfo
  187. spark.executorEnv.TERMINFO: /usr/share/terminfo
  188. spark.driverEnv.TERMINFO: /usr/share/terminfo
  189. spark.yarn.appMasterEnv.TERM: screen
  190. spark.executorEnv.TERM: screen
  191. spark.driverEnv.TERM: screen
  192. spark.yarn.appMasterEnv.USER: xxx
  193. spark.executorEnv.USER: xxx
  194. spark.driverEnv.USER: xxx
  195. spark.yarn.appMasterEnv.CONSUL_HTTP_SSL: true
  196. spark.executorEnv.CONSUL_HTTP_SSL: true
  197. spark.driverEnv.CONSUL_HTTP_SSL: true
  198. spark.yarn.appMasterEnv.SHLVL: 2
  199. spark.executorEnv.SHLVL: 2
  200. spark.driverEnv.SHLVL: 2
  201. spark.yarn.appMasterEnv.GIT_EXAMPLE: xxx
  202. spark.executorEnv.GIT_EXAMPLE: xxx
  203. spark.driverEnv.GIT_EXAMPLE: xxx
  204. spark.yarn.appMasterEnv.XDG_SESSION_ID: 3
  205. spark.executorEnv.XDG_SESSION_ID: 3
  206. spark.driverEnv.XDG_SESSION_ID: 3
  207. spark.yarn.appMasterEnv.PROFILE_DEBUG: no
  208. spark.executorEnv.PROFILE_DEBUG: no
  209. spark.driverEnv.PROFILE_DEBUG: no
  210. spark.yarn.appMasterEnv.XDG_RUNTIME_DIR: xxx
  211. spark.executorEnv.XDG_RUNTIME_DIR: xxx
  212. spark.driverEnv.XDG_RUNTIME_DIR: xxx
  213. spark.yarn.appMasterEnv.PROCPS_USERLEN: 16
  214. spark.executorEnv.PROCPS_USERLEN: 16
  215. spark.driverEnv.PROCPS_USERLEN: 16
  216. spark.yarn.appMasterEnv.PYTHONBREAKPOINT: pudb.set_trace
  217. spark.executorEnv.PYTHONBREAKPOINT: pudb.set_trace
  218. spark.driverEnv.PYTHONBREAKPOINT: pudb.set_trace
  219. spark.yarn.appMasterEnv.SSH_CLIENT: xxx
  220. spark.executorEnv.SSH_CLIENT: xx
  221. spark.driverEnv.SSH_CLIENT: xxx
  222. spark.yarn.appMasterEnv.REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
  223. spark.executorEnv.REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
  224. spark.driverEnv.REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
  225. spark.yarn.appMasterEnv.DOCKER_HOST: unix:///xxx
  226. spark.executorEnv.DOCKER_HOST: unix:///xxx
  227. spark.driverEnv.DOCKER_HOST: unix:///xx
  228. spark.yarn.appMasterEnv.TNS_ADMIN: xxx
  229. spark.executorEnv.TNS_ADMIN: xxx
  230. spark.driverEnv.TNS_ADMIN: xxx
  231. spark.yarn.appMasterEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
  232. spark.executorEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
  233. spark.driverEnv.XDG_DATA_DIRS: /usr/share/gnome:/usr/local/share/:/usr/share/
  234. spark.yarn.appMasterEnv.PATH: xxx
  235. spark.executorEnv.PATH: xxx
  236. spark.driverEnv.PATH: xxx
  237. spark.yarn.appMasterEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=xxx
  238. spark.executorEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=xx
  239. spark.driverEnv.DBUS_SESSION_BUS_ADDRESS: unix:path=xxx
  240. spark.yarn.appMasterEnv.SSH_TTY: /dev/pts/0
  241. spark.executorEnv.SSH_TTY: /dev/pts/0
  242. spark.driverEnv.SSH_TTY: /dev/pts/0
  243. spark.yarn.appMasterEnv.OLDPWD: xxx
  244. spark.executorEnv.OLDPWD: xxx
  245. spark.driverEnv.OLDPWD: xxx
  246. spark.yarn.appMasterEnv.CONSUL_HTTP_ADDR: xxx
  247. spark.executorEnv.CONSUL_HTTP_ADDR: xx
  248. spark.driverEnv.CONSUL_HTTP_ADDR: xxx
  249. spark.yarn.appMasterEnv.SSL_CERT_FILE: /etc/ssl/certs/ca-certificates.crt
  250. spark.executorEnv.SSL_CERT_FILE: /etc/ssl/certs/ca-certificates.crt
  251. spark.driverEnv.SSL_CERT_FILE: /etc/ssl/certs/ca-certificates.crt
  252. spark.yarn.appMasterEnv.CURL_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
  253. spark.executorEnv.CURL_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
  254. spark.driverEnv.CURL_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt
  255. spark.yarn.appMasterEnv.PYSPARK_RDD_CODE_SERIALIZER: pyspark.serializers.CloudPickleSerializer
  256. spark.executorEnv.PYSPARK_RDD_CODE_SERIALIZER: pyspark.serializers.CloudPickleSerializer
  257. spark.driverEnv.PYSPARK_RDD_CODE_SERIALIZER: pyspark.serializers.CloudPickleSerializer
  258. spark.yarn.appMasterEnv.PYSPARK_RDD_DATA_SERIALIZER: pyspark.serializers.CloudPickleSerializer
  259. spark.executorEnv.PYSPARK_RDD_DATA_SERIALIZER: pyspark.serializers.CloudPickleSerializer
  260. spark.driverEnv.PYSPARK_RDD_DATA_SERIALIZER: pyspark.serializers.CloudPickleSerializer
  261. spark.yarn.appMasterEnv.HADOOP_HOME: /.../hadoop/current
  262. spark.executorEnv.HADOOP_HOME: xxx
  263. spark.driverEnv.HADOOP_HOME: xxx
  264. spark.yarn.appMasterEnv.HADOOP_COMMON_HOME: xxx
  265. spark.executorEnv.HADOOP_COMMON_HOME: xxx
  266. spark.driverEnv.HADOOP_COMMON_HOME: xxx
  267. spark.yarn.appMasterEnv.HADOOP_HDFS_HOME: xxx
  268. spark.executorEnv.HADOOP_HDFS_HOME: xxx
  269. spark.driverEnv.HADOOP_HDFS_HOME: xxx
  270. spark.yarn.appMasterEnv.HADOOP_YARN_HOME: xxx
  271. spark.executorEnv.HADOOP_YARN_HOME: xxx
  272. spark.driverEnv.HADOOP_YARN_HOME: xxx
  273. spark.yarn.appMasterEnv.HADOOP_MAPRED_HOME: xxx
  274. spark.executorEnv.HADOOP_MAPRED_HOME: xxx
  275. spark.driverEnv.HADOOP_MAPRED_HOME: xxx
  276. spark.yarn.appMasterEnv.HADOOP_CONF_DIR: xxx
  277. spark.executorEnv.HADOOP_CONF_DIR: xxx
  278. spark.driverEnv.HADOOP_CONF_DIR: xxx
  279. spark.yarn.appMasterEnv.OMP_NUM_THREADS: 1
  280. spark.executorEnv.OMP_NUM_THREADS: 1
  281. spark.driverEnv.OMP_NUM_THREADS: 1
  282. spark.yarn.appMasterEnv.SPARK_HOME: xxx
  283. spark.executorEnv.SPARK_HOME: xxx
  284. spark.driverEnv.SPARK_HOME: xxx


更新3:
MemoryReporter是一个自定义类,它接收https://spark.apache.org/docs/2.3.3/api/scala/index.html#org.apache.spark.scheduler.SparkListener中描述的事件:

  1. API_URL_VAR = "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES"
  2. pyspark_v3 = pyspark.__version__.startswith("3.")
  3. class SparkListener(object):
  4. """
  5. https://spark.apache.org/docs/2.3.3/api/scala/index.html#org.apache.spark.scheduler.SparkListener
  6. """
  7. def onApplicationEnd(self, applicationEnd):
  8. pass
  9. def onApplicationStart(self, applicationStart):
  10. pass
  11. def onBlockManagerAdded(self, blockManagerAdded):
  12. pass
  13. def onBlockManagerRemoved(self, blockManagerRemoved):
  14. pass
  15. def onBlockUpdated(self, blockUpdated):
  16. pass
  17. def onEnvironmentUpdate(self, environmentUpdate):
  18. pass
  19. def onExecutorAdded(self, executorAdded):
  20. pass
  21. def onExecutorBlacklisted(self, executorBlacklisted):
  22. pass
  23. def onExecutorMetricsUpdate(self, executorMetricsUpdate):
  24. pass
  25. def onExecutorRemoved(self, executorRemoved):
  26. pass
  27. def onExecutorUnblacklisted(self, executorUnblacklisted):
  28. pass
  29. def onJobEnd(self, jobEnd):
  30. pass
  31. def onJobStart(self, jobStart):
  32. pass
  33. def onNodeBlacklisted(self, nodeBlacklisted):
  34. pass
  35. def onNodeUnblacklisted(self, nodeUnblacklisted):
  36. pass
  37. def onOtherEvent(self, event):
  38. pass
  39. def onSpeculativeTaskSubmitted(self, speculativeTask):
  40. pass
  41. def onStageCompleted(self, stageCompleted):
  42. pass
  43. def onStageSubmitted(self, stageSubmitted):
  44. pass
  45. def onTaskEnd(self, taskEnd):
  46. pass
  47. def onTaskGettingResult(self, taskGettingResult):
  48. pass
  49. def onTaskStart(self, taskStart):
  50. pass
  51. def onUnpersistRDD(self, unpersistRDD):
  52. pass
  53. class Java:
  54. implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
  55. def _scala_iterable_to_py_list(scala_iterable):
  56. if isinstance(scala_iterable, Iterable):
  57. return list(scala_iterable)
  58. py_list = []
  59. it = scala_iterable.toIterator()
  60. while it.hasNext():
  61. py_list.append(it.next())
  62. return py_list
  63. def _scala_set_to_py_set(scala_set):
  64. return set(_scala_iterable_to_py_list(scala_set))
  65. class MemoryReporter(SparkListener):
  66. def __init__(self, sc, report_dir):
  67. self.logger = logging.getLogger(self.__class__.__name__)
  68. self._sc = sc
  69. self._report_dir = os.environ.get(SPARK_MEMORY_REPORT_DIR_ENVIRON, report_dir)
  70. self._spark_conf = sc.getConf()
  71. self._username = self._sc._jvm.java.lang.System.getProperty("user.name")
  72. self._application_id = self._spark_conf.get("spark.app.id", None)
  73. spark_executor_memory = self._spark_conf.get("spark.executor.memory", None)
  74. self._requested_limit_byte = self._parse_memory_from_conf(spark_executor_memory)
  75. self._actual_limit_byte = 0 # will be updated once job is running
  76. self._executor_task = {} # type: Dict[int, int]
  77. self._task_memory = {} # type: Dict[int, int]
  78. self._stage_tasks = {} # type: Dict[int, Set[int]]
  79. self._stage_last_attempt_id = {}
  80. self._active_jobs = 0
  81. def _parse_memory_from_conf(self, memory):
  82. pass # edited out for briefness
  83. def _reset_memory_stats(self):
  84. pass # edited out for briefness
  85. def _format_memory_usage(self, x):
  86. pass # edited out for briefness
  87. def _print_report(self, task_memory):
  88. pass # edited out for briefness
  89. def update_stage_info_v2(self):
  90. status_store = self._sc._jsc.sc().statusStore()
  91. all_jobs = _scala_iterable_to_py_list(status_store.jobsList(None))
  92. all_stage_ids = []
  93. for job in all_jobs:
  94. all_stage_ids += _scala_iterable_to_py_list(job.stageIds())
  95. for stage_id in all_stage_ids:
  96. if stage_id in self._stage_tasks:
  97. continue
  98. stage_data = _scala_iterable_to_py_list(status_store.stageData(stage_id, True))
  99. if not stage_data:
  100. continue
  101. last_stage_data = stage_data[-1]
  102. stage_status = last_stage_data.status().toString()
  103. if stage_status != "ACTIVE" and stage_status != "PENDING" and last_stage_data.tasks().isDefined():
  104. stage_tasks = stage_data[-1].tasks().get()
  105. stage_task_ids = _scala_set_to_py_set(stage_tasks.keys())
  106. self._stage_tasks[stage_id] = stage_task_ids
  107. def update_stage_info_v3(self):
  108. """
  109. In pyspark 3 we don't have:
  110. - stage_info.status
  111. - status_store.stageData(stage_id, True)
  112. - a way to get task info from the stage (only via REST API)
  113. """
  114. status_store = self._sc._jsc.sc().statusStore()
  115. all_jobs = _scala_iterable_to_py_list(status_store.jobsList(None))
  116. all_stage_ids = []
  117. for job in all_jobs:
  118. all_stage_ids += _scala_iterable_to_py_list(job.stageIds())
  119. for stage_id in all_stage_ids:
  120. if stage_id in self._stage_tasks:
  121. continue
  122. app_id = self._sc.applicationId
  123. proxy_host = self._sc.getConf().get(API_URL_VAR)
  124. if not proxy_host:
  125. continue
  126. stages_api_url = f"{proxy_host}/api/v1/applications/{app_id}/stages/{stage_id}"
  127. try:
  128. response = requests.get(stages_api_url)
  129. stages_data = response.json()
  130. except Exception as ex:
  131. self.logger.warning(f"Failed getting stages_data from API Url: {stages_api_url} ex: {ex}")
  132. continue
  133. last_stage_attempt = stages_data[-1]
  134. last_stage_status = last_stage_attempt.get("status")
  135. if last_stage_status and last_stage_status != "ACTIVE" and last_stage_status != "PENDING":
  136. attempt_id = last_stage_attempt.get("attemptId")
  137. self._stage_last_attempt_id[stage_id] = attempt_id
  138. if attempt_id is not None: # careful as it can be 0 (evaluates to False)
  139. tasks_info_api_url = (
  140. f"{proxy_host}/api/v1/applications/{app_id}/stages/{stage_id}/{attempt_id}/taskList"
  141. )
  142. try:
  143. response2 = requests.get(tasks_info_api_url)
  144. tasks_info_data = response2.json()
  145. self.logger.debug(f"tasks_info_api_url: {tasks_info_api_url}")
  146. task_ids = [t["taskId"] for t in tasks_info_data]
  147. self._stage_tasks[stage_id] = task_ids
  148. except Exception as ex:
  149. self.logger.warning(f"Failed getting tasks_info_data API url: {tasks_info_api_url} ex: {ex}")
  150. continue
  151. def _update_stage_tasks(self):
  152. if pyspark_v3:
  153. self.update_stage_info_v3()
  154. else:
  155. self.update_stage_info_v2()
  156. def _save_csv(self, job_id, df):
  157. pass # edited out for briefness
  158. def _save_report(self, job_id, task_memory):
  159. pass # edited out for briefness
  160. def onTaskStart(self, taskStart):
  161. task_id = taskStart.taskInfo().taskId()
  162. executor_id = taskStart.taskInfo().executorId()
  163. self.logger.debug("setting executor '{}' to task '{}'".format(executor_id, task_id))
  164. self._executor_task[executor_id] = task_id
  165. def onExecutorMetricsUpdate(self, executorMetricsUpdate):
  166. executor_id = executorMetricsUpdate.execId() # returns 'driver' in pyspark > 3 but a number in pyspark < 2
  167. if pyspark_v3:
  168. # https://spark.apache.org/docs/3.1.1/monitoring.html
  169. # https://github.com/apache/spark/blob/v3.1.1/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala#L196
  170. memory_byte = 0
  171. metrics = executorMetricsUpdate.executorUpdates().values().toList() # Scala view to values
  172. metrics = [metrics.apply(i) for i in range(metrics.size())]
  173. for m in metrics:
  174. memory_byte = max(memory_byte, m.getMetricValue("JVMHeapMemory"))
  175. else:
  176. # Backward compatible. e.g. v2.3.4
  177. memory_byte = executorMetricsUpdate.cgroupMetrics().memoryUsageInBytes()
  178. self._actual_limit_byte = max(
  179. self._actual_limit_byte, executorMetricsUpdate.cgroupMetrics().memoryLimitInBytes()
  180. )
  181. task_id = self._executor_task.get(executor_id, None)
  182. if task_id is None:
  183. # would still receive metrics updates after job finishes
  184. self.logger.debug("Cannot find corresponding task for executor {}".format(executor_id))
  185. return
  186. self._task_memory[task_id] = max(self._task_memory.get(task_id, 0), memory_byte)
  187. def getExecutorMetricsFromAPI(self):
  188. """
  189. Get Spark metrics from the Spark API
  190. Potentially switch this to use this end point:
  191. application_1697883854732_43137/api/v1/applications/application_1697883854732_43137/executors
  192. Or another option is to record this info in update_stage_info_v3() directly
  193. """
  194. self.logger.info("Getting metrics from Spark API")
  195. app_id = self._sc.applicationId
  196. proxy_host = self._sc.getConf().get(API_URL_VAR)
  197. if not proxy_host:
  198. return
  199. for stage_id, attempt_id in self._stage_last_attempt_id.items():
  200. tasks_info_api_url = f"{proxy_host}/api/v1/applications/{app_id}/stages/{stage_id}/{attempt_id}/taskList"
  201. try:
  202. resp = requests.get(tasks_info_api_url)
  203. tasks_info_data = resp.json()
  204. for task_info in tasks_info_data:
  205. task_id = task_info.get("taskId")
  206. if task_id is not None: # warning, this can be 0 (evaluates to False)
  207. peak_mem = task_info["taskMetrics"]["peakExecutionMemory"]
  208. task_max_mem = max(self._task_memory.get(task_id, 0), peak_mem)
  209. self._task_memory[task_info["taskId"]] = task_max_mem
  210. self.logger.debug(
  211. f"metrics_from_api: stage: {stage_id} task: {task_id} task_max_mem: {task_max_mem}"
  212. )
  213. except Exception as ex:
  214. # Even if one fails, we try the others to get some data for the report
  215. self.logger.warning(f"metrics_from_api: failed getting tasks_info url: {tasks_info_api_url} ex: {ex}")
  216. def onJobStart(self, jobStart):
  217. self._active_jobs += 1
  218. def onJobEnd(self, jobEnd):
  219. self._active_jobs -= 1
  220. job_id = jobEnd.jobId()
  221. if self._active_jobs == 0:
  222. self._update_stage_tasks()
  223. self.logger.info(f"stage_task updated: {self._stage_tasks} job: {job_id}")
  224. if self._task_memory:
  225. self.logger.debug(f"Collected task memory: {self._task_memory} for job_id: {job_id}")
  226. self._print_report(self._task_memory)
  227. self._save_report(job_id, self._task_memory)
  228. self.logger.info("More details at {}".format(get_am_ui_url(self._sc, self._spark_conf)))
  229. else:
  230. if pyspark_v3:
  231. # onExecutorMetricsUpdate() isn't working as expected in pyspark 3+, so we use the RESTP API
  232. self.logger.info(f"Attempting report via Spark API. version: {pyspark.__version__}")
  233. self.getExecutorMetricsFromAPI()
  234. self._print_report(self._task_memory)
  235. self._save_report(job_id, self._task_memory)
  236. self.logger.info("More details at {}".format(get_am_ui_url(self._sc, self._spark_conf)))
  237. else:
  238. self.logger.info(f"Not enough data collected for the report of job: {job_id}")
  239. self._reset_memory_stats()
  240. def onApplicationEnd(self, applicationEnd):
  241. self.logger.info(
  242. "Spark application has ended. Access from the history server {}".format(
  243. get_history_server_url(self._sc, self._spark_conf)
  244. )
  245. )

kt06eoxx

kt06eoxx1#

问题是我们添加的一个自定义指标没有像之前的spark版本那样可序列化。这导致从worker发送heartbeat时失败。不幸的是,这个问题的唯一提示出现在运行超过120秒的作业时(heartbeat超时)。

7xzttuei

7xzttuei2#

我怀疑可能不仅是你的版本改变了,而且你的集群设置也改变了。
executor_id可以等于driver。每个SparkContext中的SparkContext.DRIVER_IDENTIFIER变量等于3.4.1中的driver。每个节点都使用它来确定正在运行的进程是BlockManagerId中的执行器还是驱动程序:

  1. def isDriver: Boolean = {
  2. executorId == SparkContext.DRIVER_IDENTIFIER
  3. }

字符串
根据您在问题中提供的信息,很难说100%确定(例如,我们不知道您在代码中如何定义executorMetricsUpdate),但如果您的executor_id变量始终等于driver,则可能您正在使用--master local而不是使用实际的集群运行集群。
如果您使用--master locallocal的任何变体运行spark应用程序,以便在运行单个JVM进程的单个机器上运行代码,则驱动程序将同时作为驱动程序和执行器执行。
因此,请确保您正在实际的集群上运行,然后您可能会获得想要的行为!

相关问题