当spark和airflow安装在同一虚拟环境中时,如何从airflow运行spark作业?

czfnxgou  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(508)

我试着在我的笔记本电脑上运行测试etl pyspark作业。我在同一个虚拟环境中安装了spark和airflow。重要的是我可以在虚拟环境中成功地运行作业,但在虚拟环境中失败。从airflow日志中,我可以看到etl任务失败,因为它在尝试执行时在临时目录中查找dag文件。错误如下:“python:cannotopenfile'/tmp/airflowtmpd9uwr02/spark\uetl\u1.py'(下面的整个日志)。

  1. > AIRFLOW_CTX_DAG_OWNER=florian
  2. AIRFLOW_CTX_DAG_ID=Example_Airflow_and_Spark
  3. AIRFLOW_CTX_TASK_ID=spark_task_etl
  4. AIRFLOW_CTX_EXECUTION_DATE=2021-02-24T14:28:23.580164+00:00
  5. AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-24T14:28:23.580164+00:00
  6. [2021-02-24 16:28:27,498] {bash.py:135} INFO - Tmp dir root location:
  7. /tmp
  8. [2021-02-24 16:28:27,499] {bash.py:158} INFO - Running command: /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py
  9. [2021-02-24 16:28:27,503] {bash.py:169} INFO - Output:
  10. [2021-02-24 16:28:28,273] {bash.py:173} INFO - 21/02/24 16:28:28 WARN Utils: Your hostname, florian-TUXEDO resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlp2s0)
  11. [2021-02-24 16:28:28,273] {bash.py:173} INFO - 21/02/24 16:28:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
  12. [2021-02-24 16:28:28,620] {bash.py:173} INFO - 21/02/24 16:28:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  13. [2021-02-24 16:28:28,833] {bash.py:173} INFO -**python: can't open file '/tmp/airflowtmpd9uwr02_/spark_etl_1.py': [Errno 2] No such file or directory**
  14. [2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
  15. [2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN Please initialize the log4j system properly.
  16. [2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  17. [2021-02-24 16:28:28,854] {bash.py:177} INFO - Command exited with return code 2
  18. [2021-02-24 16:28:28,857] {taskinstance.py:1455} ERROR - Bash command failed. The command returned a non-zero exit code.
  19. Traceback (most recent call last):

这是dag代码:

  1. from datetime import timedelta
  2. from airflow import DAG
  3. from airflow.operators.bash import BashOperator
  4. from airflow.utils.dates import days_ago
  5. srcDir = "/home/florian/HDD2/WORK/PROJECTS/Personale/SPARK/"
  6. sparkSubmit = "/home/florian/HDD2/WORK/PROJECTS/Personale/SPARK/spark_env/bin/spark-submit/"
  7. args = {
  8. 'owner': 'florian',
  9. }
  10. dag = DAG(
  11. dag_id='Example_Airflow_and_Spark',
  12. default_args=args,
  13. description='Test Spark ETL job from Airflow',
  14. start_date=days_ago(2),
  15. dagrun_timeout=timedelta(minutes=60),
  16. )
  17. change_dir = BashOperator(
  18. task_id='change_working_directory',
  19. bash_command="cd " + srcDir,
  20. dag=dag,
  21. )
  22. ### [activate virtual environment]
  23. activate_env = BashOperator(
  24. task_id='activate_virtual_env',
  25. bash_command="source /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/activate",
  26. dag=dag,
  27. )
  28. ### [spark_job_operator_bash]
  29. spark_submit_cmd="/home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py"
  30. spark_job = BashOperator(
  31. task_id='spark_task_etl',
  32. bash_command=spark_submit_cmd,
  33. dag=dag,
  34. )
  35. ### [deactivate virtual environment]
  36. deactivate_env = BashOperator(
  37. task_id='deactivate_virtual_env',
  38. bash_command="deactivate",
  39. dag=dag,
  40. )
  41. change_dir >> activate_env >> spark_job >> deactivate_env
wz3gfoph

wz3gfoph1#

你的狗住在 AIRFLOW_HOME 我猜是你的 /tmp 文件夹。执行spark submit时,airflow会在同一文件夹中查找spark脚本,因此

  1. /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py

相当于

  1. /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local ${AIRFLOW_HOME}/spark_etl_1.py

您可能需要考虑提交spark脚本的绝对路径,而不只是提供文件名 spark_etl_1.py ,例如。

  1. /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local /path/from/root/spark_etl_1.py

相关问题