我试着在我的笔记本电脑上运行测试etl pyspark作业。我在同一个虚拟环境中安装了spark和airflow。重要的是我可以在虚拟环境中成功地运行作业,但在虚拟环境中失败。从airflow日志中,我可以看到etl任务失败,因为它在尝试执行时在临时目录中查找dag文件。错误如下:“python:cannotopenfile'/tmp/airflowtmpd9uwr02/spark\uetl\u1.py'(下面的整个日志)。
> AIRFLOW_CTX_DAG_OWNER=florian
AIRFLOW_CTX_DAG_ID=Example_Airflow_and_Spark
AIRFLOW_CTX_TASK_ID=spark_task_etl
AIRFLOW_CTX_EXECUTION_DATE=2021-02-24T14:28:23.580164+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-24T14:28:23.580164+00:00
[2021-02-24 16:28:27,498] {bash.py:135} INFO - Tmp dir root location:
/tmp
[2021-02-24 16:28:27,499] {bash.py:158} INFO - Running command: /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py
[2021-02-24 16:28:27,503] {bash.py:169} INFO - Output:
[2021-02-24 16:28:28,273] {bash.py:173} INFO - 21/02/24 16:28:28 WARN Utils: Your hostname, florian-TUXEDO resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlp2s0)
[2021-02-24 16:28:28,273] {bash.py:173} INFO - 21/02/24 16:28:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
[2021-02-24 16:28:28,620] {bash.py:173} INFO - 21/02/24 16:28:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[2021-02-24 16:28:28,833] {bash.py:173} INFO -**python: can't open file '/tmp/airflowtmpd9uwr02_/spark_etl_1.py': [Errno 2] No such file or directory**
[2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
[2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN Please initialize the log4j system properly.
[2021-02-24 16:28:28,837] {bash.py:173} INFO - log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2021-02-24 16:28:28,854] {bash.py:177} INFO - Command exited with return code 2
[2021-02-24 16:28:28,857] {taskinstance.py:1455} ERROR - Bash command failed. The command returned a non-zero exit code.
Traceback (most recent call last):
这是dag代码:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
srcDir = "/home/florian/HDD2/WORK/PROJECTS/Personale/SPARK/"
sparkSubmit = "/home/florian/HDD2/WORK/PROJECTS/Personale/SPARK/spark_env/bin/spark-submit/"
args = {
'owner': 'florian',
}
dag = DAG(
dag_id='Example_Airflow_and_Spark',
default_args=args,
description='Test Spark ETL job from Airflow',
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
)
change_dir = BashOperator(
task_id='change_working_directory',
bash_command="cd " + srcDir,
dag=dag,
)
### [activate virtual environment]
activate_env = BashOperator(
task_id='activate_virtual_env',
bash_command="source /home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/activate",
dag=dag,
)
### [spark_job_operator_bash]
spark_submit_cmd="/home/florian/HDD2/WORK/PROJECTS/SPARK/spark_env/bin/spark-submit --master local spark_etl_1.py"
spark_job = BashOperator(
task_id='spark_task_etl',
bash_command=spark_submit_cmd,
dag=dag,
)
### [deactivate virtual environment]
deactivate_env = BashOperator(
task_id='deactivate_virtual_env',
bash_command="deactivate",
dag=dag,
)
change_dir >> activate_env >> spark_job >> deactivate_env
1条答案
按热度按时间wz3gfoph1#
你的狗住在
AIRFLOW_HOME
我猜是你的/tmp
文件夹。执行spark submit时,airflow会在同一文件夹中查找spark脚本,因此相当于
您可能需要考虑提交spark脚本的绝对路径,而不只是提供文件名
spark_etl_1.py
,例如。