在dataprock上使用PEX环境打包PySpark

ovfsdjhp  于 2023-01-01  发布在  Spark
关注(0)|答案(3)|浏览(340)

我试图用PEX打包一个pyspark作业,以便在谷歌云数据处理器上运行,但我得到了一个Permission Denied错误。
我已经将第三方和本地依赖项打包到env.pex中,并将使用这些依赖项的入口点打包到main.py中,然后将这两个文件gsutil cpgs://<PATH>,并运行下面的脚本。

  1. from google.cloud import dataproc_v1 as dataproc
  2. from google.cloud import storage
  3. def submit_job(project_id: str, region: str, cluster_name: str):
  4. job_client = dataproc.JobControllerClient(
  5. client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
  6. )
  7. operation = job_client.submit_job_as_operation(
  8. request={
  9. "project_id": project_id,
  10. "region": region,
  11. "job": {
  12. "placement": {"cluster_name": cluster_name},
  13. "pyspark_job": {
  14. "main_python_file_uri": "gs://<PATH>/main.py",
  15. "file_uris": ["gs://<PATH>/env.pex"],
  16. "properties": {
  17. "spark.pyspark.python": "./env.pex",
  18. "spark.executorEnv.PEX_ROOT": "./.pex",
  19. },
  20. },
  21. },
  22. }
  23. )

我得到的错误是

  1. Exception in thread "main" java.io.IOException: Cannot run program "./env.pex": error=13, Permission denied
  2. at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
  3. at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:97)
  4. at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
  5. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  7. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  8. at java.lang.reflect.Method.invoke(Method.java:498)
  9. at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  10. at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
  11. at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
  12. at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
  13. at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
  14. at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
  15. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
  16. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  17. Caused by: java.io.IOException: error=13, Permission denied
  18. at java.lang.UNIXProcess.forkAndExec(Native Method)
  19. at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
  20. at java.lang.ProcessImpl.start(ProcessImpl.java:134)
  21. at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
  22. ... 14 more

我应该期望像这样打包我的环境吗?我没有看到一种方法来改变pyspark作业配置中包含的file_uris文件的权限,我也没有在google云上看到任何关于使用PEX打包的文档,但是PySpark官方文档包含了这个指南。
任何帮助都很感激-谢谢!

vxbzzdmp

vxbzzdmp1#

你总是可以使用一个兼容的解释器来运行PEX文件。所以你可以尝试使用python env.pex来代替指定./env.pex的程序。这并不要求env.pex是可执行的。

von4xj4u

von4xj4u2#

最后我无法直接运行pex,但现在确实得到了一个变通方案,这是pants slack community中的一个用户建议的(谢谢!)
解决方法是在集群初始化脚本中将pex解压缩为venv。
将初始化脚本gsutil复制到gs://<PATH TO INIT SCRIPT>

  1. #!/bin/bash
  2. set -exo pipefail
  3. readonly PEX_ENV_FILE_URI=$(/usr/share/google/get_metadata_value attributes/PEX_ENV_FILE_URI || true)
  4. readonly PEX_FILES_DIR="/pexfiles"
  5. readonly PEX_ENV_DIR="/pexenvs"
  6. function err() {
  7. echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $*" >&2
  8. exit 1
  9. }
  10. function install_pex_into_venv() {
  11. local -r pex_name=${PEX_ENV_FILE_URI##*/}
  12. local -r pex_file="${PEX_FILES_DIR}/${pex_name}"
  13. local -r pex_venv="${PEX_ENV_DIR}/${pex_name}"
  14. echo "Installing pex from ${pex_file} into venv ${pex_venv}..."
  15. gsutil cp "${PEX_ENV_FILE_URI}" "${pex_file}"
  16. PEX_TOOLS=1 python "${pex_file}" venv --compile "${pex_venv}"
  17. }
  18. function main() {
  19. if [[ -z "${PEX_ENV_FILE_URI}" ]]; then
  20. err "ERROR: Must specify PEX_ENV_FILE_URI metadata key"
  21. fi
  22. install_pex_into_venv
  23. }
  24. main

要启动群集并运行初始化脚本以将pex解压缩到群集上的venv中,请执行以下操作:

  1. from google.cloud import dataproc_v1 as dataproc
  2. def start_cluster(project_id: str, region: str, cluster_name: str):
  3. cluster_client = dataproc.ClusterControllerClient(...)
  4. operation = cluster_client.create_cluster(
  5. request={
  6. "project_id": project_id,
  7. "region": region,
  8. "cluster": {
  9. "project_id": project_id,
  10. "cluster_name": cluster_name,
  11. "config": {
  12. "master_config": <CONFIG>,
  13. "worker_config": <CONFIG>,
  14. "initialization_actions": [
  15. {
  16. "executable_file": "gs://<PATH TO INIT SCRIPT>",
  17. },
  18. ],
  19. "gce_cluster_config": {
  20. "metadata": {"PEX_ENV_FILE_URI": "gs://<PATH>/env.pex"},
  21. },
  22. },
  23. },
  24. }
  25. )

要启动作业并使用解压缩的pex venv运行pyspark作业,请执行以下操作:

  1. def submit_job(project_id: str, region: str, cluster_name: str):
  2. job_client = dataproc.ClusterControllerClient(...)
  3. operation = job_client.submit_job_as_operation(
  4. request={
  5. "project_id": project_id,
  6. "region": region,
  7. "job": {
  8. "placement": {"cluster_name": cluster_name},
  9. "pyspark_job": {
  10. "main_python_file_uri": "gs://<PATH>/main.py",
  11. "properties": {
  12. "spark.pyspark.python": "/pexenvs/env.pex/bin/python",
  13. },
  14. },
  15. },
  16. }
  17. )
展开查看全部
kg7wmglp

kg7wmglp3#

下面@megabits的答案是适合我的基于bash的工作流
1.将初始化脚本(从answer)复制到GCS,作为gs://BUCKET/pkg/cluster-env-init.bash
1.构建PEX,提供初始化脚本所需的--include-tools参数,例如

  1. pex --include-tools -r requirements.txt -o env.pex

1.将PEX文件放入GCS

  1. gsutil mv env.pex "gs://BUCKET/pkg/env.pex"

1.使用PEX文件创建群集以设置环境

  1. gcloud dataproc clusters create your-cluster --region us-central1 \
  2. --initialization-actions="gs://BUCKET/pkg/cluster-env-init.bash" \
  3. --metadata "PEX_ENV_FILE_URI=gs://BUCKET/pkg/env.pex"

1.运行作业

  1. gcloud dataproc jobs submit pyspark your-script.py \
  2. --cluster=your-cluster --region us-central1 \
  3. --properties spark.pyspark.python="/pexenvs/env.pex/bin/python"
展开查看全部

相关问题