pyspark Apache Airflow将数据从BashOperator传递到SparkSubmitOperator

vnjpjtjt  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(174)

我试图登录到服务器100.18.10.182,并在Apache Airflow中的.182服务器上触发服务器100.18.10.36中的spark提交作业。我使用了BashOperator(一个shell脚本,用于ssh到 100.18.10.182服务器),对于spark提交作业,我使用了SparkSubmitOperator作为BashOperator的下游。我能够成功执行BashOperator,但SparkOperator失败:Cannot execute: Spark submit
我想这是因为我无法将我的SSH会话(.182服务器)传递到下一个SparkSubmitOperator,或者可能是由于与--jars或--packages相关的其他问题,这里不确定。
我想使用xcom_push将一些数据从我的BashOperator和xcom_pull推送到SparkSubmitOperator中,但不确定如何以我的服务器登录的方式传递它,然后我的SparkSubmitOperator从该框本身触发?
气流dag代码:

  1. t2 = BashOperator(
  2. task_id='test_bash_operator',
  3. bash_command="/Users/hardikgoel/Downloads/Work/airflow_dir/shell_files/airflow_prod_ssh_script.sh ",
  4. dag=dag)
  5. t2
  6. t3_config = {
  7. 'conf': {
  8. "spark.yarn.maxAppAttempts": "1",
  9. "spark.yarn.executor.memoryOverhead": "8"
  10. },
  11. 'conn_id': 'spark_default',
  12. 'packages': 'com.sparkjobs.SparkJobsApplication',
  13. 'jars': '/var/spark/spark-jobs-0.0.1-SNAPSHOT-1/spark-jobs-0.0.1-SNAPSHOT.jar firstJob',
  14. 'driver_memory': '1g',
  15. 'total_executor_cores': '21',
  16. 'executor_cores': 7,
  17. 'executor_memory': '48g'
  18. }
  19. t3 = SparkSubmitOperator(
  20. task_id='t3',
  21. **t3_config)
  22. t2 >> t3

字符串
Shell脚本代码:

  1. #!/bin/bash
  2. USERNAME=hardikgoel
  3. HOSTS="100.18.10.182"
  4. SCRIPT="pwd; ls"
  5. ssh -l ${USERNAME} ${HOSTS} "${SCRIPT}"
  6. echo "SSHed successfully"
  7. if [ ${PIPESTATUS[0]} -eq 0 ]; then
  8. echo "successfull"
  9. fi

5ktev3wc

5ktev3wc1#

在同一BashOperator中组合合并SSH和Spark提交命令:

  1. t2 = BashOperator(
  2. task_id='ssh_and_spark_submit',
  3. bash_command="ssh -tt ${USERNAME}@${HOSTS} '/path/to/spark-submit --jars ${JARS} --packages ${PACKAGES} ${SPARK_SUBMIT_ARGS}'",
  4. dag=dag
  5. )

字符串
也可以使用xcom:

  1. t2 = BashOperator(
  2. task_id='ssh_and_push_success',
  3. bash_command="ssh -tt ${USERNAME}@${HOSTS} 'pwd; ls' && echo 'success'",
  4. xcom_push=True,
  5. dag=dag
  6. )
  7. t3 = SparkSubmitOperator(
  8. task_id='spark_submit_if_ssh_success',
  9. trigger_rule='one_success',
  10. provide_context=True, # Access XCom value
  11. **t3_config
  12. )
  13. def trigger_spark_if_ssh_success(context):
  14. return context['ti'].xcom_pull(task_ids='ssh_and_push_success') == 'success'
  15. t3.set_upstream(t2)
  16. t3.set_downstream(TriggerDagRunOperator(trigger_dag_id="downstream_dag_id"))

展开查看全部

相关问题