我有一份通过kubernetes吊舱运行的spark工作。到目前为止,我一直在使用yaml文件手动运行我的作业。现在,我想通过气流来安排我的Spark作业。这是我第一次使用气流,我不知道如何在气流中添加我的yaml文件。从我所读到的是,我可以安排我的工作,通过一个dag在气流。dag示例如下:
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {'owner':'test', 'start_date' : datetime(2019, 4, 3), 'retries': 2, 'retry_delay': timedelta(minutes=1) }
dag = DAG('test_dag', default_args = args, catchup=False)
def print_text1():
print("hell-world1")
def print_text():
print('Hello-World2')
t1 = PythonOperator(task_id='multitask1', python_callable=print_text1, dag=dag)
t2 = PythonOperator(task_id='multitask2', python_callable=print_text, dag=dag)
t1 >> t2
在这种情况下,一旦我玩了dag,上面的方法就会一个接一个地执行。现在,如果我想运行spark提交作业,我应该怎么做?我正在使用spark 2.4.4
1条答案
按热度按时间yshpjwxd1#
气流有一个操作符的概念,它表示气流任务。在您的示例中使用了pythonoperator,它只执行python代码,很可能不是您感兴趣的代码,除非您在python代码中提交spark作业。您可以使用几个运算符:
bash操作符,它为您执行给定的bash脚本。你可以跑了
kubectl
或者spark-submit
直接使用sparksubmitoperator,要调用的特定运算符
spark-submit
kubernetespodoperator,为您创建kubernetes pod,您可以直接使用它启动驱动程序pod混合解决方案,例如httpoperator+livy on kubernetes,您在kubernetes上启动livy服务器,它充当spark作业服务器,并提供httpoperator调用的restapi
注意:对于每个操作符,您需要确保您的airflow环境包含执行所需的所有依赖项以及配置为访问所需服务的凭据。
您还可以参考现有线程:
airflow sparksubmitoperator-如何在另一个服务器中触发提交