当spark和airflow安装在同一虚拟环境中时,如何从airflow运行spark作业?

czfnxgou  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(455)

我试着在我的笔记本电脑上运行测试etl pyspark作业。我在同一个虚拟环境中安装了spark和airflow。重要的是我可以在虚拟环境中成功地运行作业,但在虚拟环境中失败。从airflow日志中,我可以看到etl任务失败,因为它在尝试执行时在临时目录中查找dag文件。错误如下:“python:cannotopenfile'/tmp/airflowtmpd9uwr02/spark\uetl\u1.py'(下面的整个日志)。

> AIRFLOW_CTX_DAG_OWNER=florian
AIRFLOW_CTX_DAG_ID=Example_Airflow_and_Spark
AIRFLOW_CTX_TASK_ID=spark_task_etl
AIRFLOW_CTX_EXECUTION_DATE=2021-02-24T14:28:23.580164+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-24T14:28:23.580164+00:00
[2021-02-24 16:28:27,498] {bash.py:135} INFO - Tmp dir root location: 
 /tmp
[2021-02-24 16:28:27,499] {bash.py:158} INFO - Running command: /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py
[2021-02-24 16:28:27,503] {bash.py:169} INFO - Output:
[2021-02-24 16:28:28,273] {bash.py:173} INFO - 21/02/24 16:28:28 WARN Utils: Your hostname, florian-TUXEDO resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlp2s0)
[2021-02-24 16:28:28,273] {bash.py:173} INFO - 21/02/24 16:28:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
[2021-02-24 16:28:28,620] {bash.py:173} INFO - 21/02/24 16:28:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[2021-02-24 16:28:28,833] {bash.py:173} INFO -**python: can't open file '/tmp/airflowtmpd9uwr02_/spark_etl_1.py': [Errno 2] No such file or directory**
[2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
[2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN Please initialize the log4j system properly.
[2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2021-02-24 16:28:28,854] {bash.py:177} INFO - Command exited with return code 2
[2021-02-24 16:28:28,857] {taskinstance.py:1455} ERROR - Bash command failed. The command returned a non-zero exit code.
Traceback (most recent call last):

这是dag代码:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

srcDir = "/home/florian/HDD2/WORK/PROJECTS/Personale/SPARK/"
sparkSubmit = "/home/florian/HDD2/WORK/PROJECTS/Personale/SPARK/spark_env/bin/spark-submit/"

args = {
    'owner': 'florian',
}

dag = DAG(
    dag_id='Example_Airflow_and_Spark',
    default_args=args,
    description='Test Spark ETL job from Airflow',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),

)

change_dir = BashOperator(
    task_id='change_working_directory',
    bash_command="cd " + srcDir,
    dag=dag,
)

### [activate virtual environment]

activate_env = BashOperator(
    task_id='activate_virtual_env',
    bash_command="source /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/activate",
    dag=dag,
)

### [spark_job_operator_bash]

spark_submit_cmd="/home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py"

spark_job = BashOperator(
    task_id='spark_task_etl',
    bash_command=spark_submit_cmd,
    dag=dag,
)

### [deactivate virtual environment]

deactivate_env = BashOperator(
    task_id='deactivate_virtual_env',
    bash_command="deactivate",
    dag=dag,
)

change_dir >> activate_env >> spark_job >> deactivate_env
wz3gfoph

wz3gfoph1#

你的狗住在 AIRFLOW_HOME 我猜是你的 /tmp 文件夹。执行spark submit时,airflow会在同一文件夹中查找spark脚本,因此

/home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py

相当于

/home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local ${AIRFLOW_HOME}/spark_etl_1.py

您可能需要考虑提交spark脚本的绝对路径,而不只是提供文件名 spark_etl_1.py ,例如。

/home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local /path/from/root/spark_etl_1.py

相关问题