地球上的人们,你们好!我正在使用Airflow来调度和运行Spark任务。到目前为止,我只找到了Airflow可以管理的python DAG。
DAG示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
问题是我不擅长Python代码,有些任务是用Java写的。我的问题是如何在python DAG中运行Spark Java jar?或者你还有别的办法?我发现spark提交:http://spark.apache.org/docs/latest/submitting-applications.html
但我不知道怎么把所有的东西联系起来。也许有人以前用过它,并有工作的例子。感谢您的宝贵时间!
4条答案
按热度按时间kxkpmulp1#
您应该可以使用
BashOperator
。保持代码的其余部分不变,导入所需的类和系统包:设置所需路径:
并添加运算符:
您可以很容易地扩展它,使用Jinja模板提供额外的参数。
当然,您可以在非Spark场景中调整此设置,将
bash_command
替换为适合您的模板,例如:调整
params
。13z8s7eq2#
Airflow 1.8版(今天发布)
SparkSQLHook代码-https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
SparkSubmitHook代码-https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
请注意,这两个新的Spark操作符/钩子在1.8版本的“contrib”分支中,因此没有(很好地)记录。
因此,您可以使用SparkSubmitOperator提交Java代码以供Spark执行。
t1qtbnec3#
在kubernetes(minikube示例)上,有一个在Spark 2.3.1中使用
SparkSubmitOperator
的例子:使用存储在Airflow变量中的变量的代码:
另外,您需要创建一个新的spark连接或使用额外的字典
{"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
编辑现有的'spark_default':cclgggtu4#
在Airflow UI中进入
Admin
->Connection
->Create
。通过提供host = IP地址、port = 22和extra作为{"key_file": "/path/to/pem/file", "no_host_key_check":true}
来创建新的SSH连接这个主机应该是Spark集群主服务器,你可以从这里提交spark-jobs。接下来,您需要使用SSHOperator创建DAG。下面是这个的模板。
就是这样。您还可以在Airflow中获得此Spark作业的完整日志。