我试图登录到服务器100.18.10.182,并在Apache Airflow中的.182服务器上触发服务器100.18.10.36中的spark提交作业。我使用了BashOperator(一个shell脚本,用于ssh到 100.18.10.182服务器),对于spark提交作业,我使用了SparkSubmitOperator作为BashOperator的下游。我能够成功执行BashOperator,但SparkOperator失败:Cannot execute: Spark submit
我想这是因为我无法将我的SSH会话(.182服务器)传递到下一个SparkSubmitOperator,或者可能是由于与--jars或--packages相关的其他问题,这里不确定。
我想使用xcom_push将一些数据从我的BashOperator和xcom_pull推送到SparkSubmitOperator中,但不确定如何以我的服务器登录的方式传递它,然后我的SparkSubmitOperator从该框本身触发?
气流dag代码:
t2 = BashOperator(
task_id='test_bash_operator',
bash_command="/Users/hardikgoel/Downloads/Work/airflow_dir/shell_files/airflow_prod_ssh_script.sh ",
dag=dag)
t2
t3_config = {
'conf': {
"spark.yarn.maxAppAttempts": "1",
"spark.yarn.executor.memoryOverhead": "8"
},
'conn_id': 'spark_default',
'packages': 'com.sparkjobs.SparkJobsApplication',
'jars': '/var/spark/spark-jobs-0.0.1-SNAPSHOT-1/spark-jobs-0.0.1-SNAPSHOT.jar firstJob',
'driver_memory': '1g',
'total_executor_cores': '21',
'executor_cores': 7,
'executor_memory': '48g'
}
t3 = SparkSubmitOperator(
task_id='t3',
**t3_config)
t2 >> t3
字符串
Shell脚本代码:
#!/bin/bash
USERNAME=hardikgoel
HOSTS="100.18.10.182"
SCRIPT="pwd; ls"
ssh -l ${USERNAME} ${HOSTS} "${SCRIPT}"
echo "SSHed successfully"
if [ ${PIPESTATUS[0]} -eq 0 ]; then
echo "successfull"
fi
型
1条答案
按热度按时间5ktev3wc1#
在同一BashOperator中组合合并SSH和Spark提交命令:
字符串
也可以使用xcom:
型