我是新的气流和Spark,我与斯巴克submitor斗争。
我们的气流调度器和hadoop集群不是在同一台机器上设置的(第一个问题:这是一个好的实践吗?)。
我们有许多自动过程需要调用pyspark脚本。这些pyspark脚本存储在hadoop集群(10.70.1.35)中。气流DAG储存在气流机(10.70.1.22)中。
目前,当我们希望spark提交一个带有airflow的pyspark脚本时,我们使用一个简单的bashoperator,如下所示:
cmd = "ssh hadoop@10.70.1.35 spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 2g \
--executor-cores 2 \
/home/hadoop/pyspark_script/script.py"
t = BashOperator(task_id='Spark_datamodel',bash_command=cmd,dag=dag)
它工作得很好。但是我们想开始使用sparksubmitoperator来提交pyspark脚本。
我试过这个:
from airflow import DAG
from datetime import timedelta, datetime
from airflow.contrib.operators.spark_submit_operator import
SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
dag = DAG('SPARK_SUBMIT_TEST',start_date=datetime(2018,12,10),
schedule_interval='@daily')
sleep = BashOperator(task_id='sleep', bash_command='sleep 10',dag=dag)
_config ={'application':'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py',
'master' : 'yarn',
'deploy-mode' : 'cluster',
'executor_cores': 1,
'EXECUTORS_MEM': '2G'
}
spark_submit_operator = SparkSubmitOperator(
task_id='spark_submit_job',
dag=dag,
**_config)
sleep.set_downstream(spark_submit_operator)
语法应该是正确的,因为dag不会显示为断开。但当它运行时,它会给我以下错误:
[2018-12-14 03:26:42,600] {logging_mixin.py:95} INFO - [2018-12-14
03:26:42,600] {base_hook.py:83} INFO - Using connection to: yarn
[2018-12-14 03:26:42,974] {logging_mixin.py:95} INFO - [2018-12-14
03:26:42,973] {spark_submit_hook.py:283} INFO - Spark-Submit cmd:
['spark-submit', '--master', 'yarn', '--executor-cores', '1', '--name',
'airflow-spark', '--queue', 'root.default',
'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py']
[2018-12-14 03:26:42,977] {models.py:1760} ERROR - [Errno 2] No such
file or directory: 'spark-submit'
Traceback (most recent call last):
File "/home/dataetl/anaconda3/lib/python3.6/site-
packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/dataetl/anaconda3/lib/python3.6/site-
packages/airflow/contrib/operators/spark_submit_operator.py", line
168,
in execute
self._hook.submit(self._application)
File "/home/dataetl/anaconda3/lib/python3.6/site-
packages/airflow/contrib/hooks/spark_submit_hook.py", line 330, in
submit
**kwargs)
File "/home/dataetl/anaconda3/lib/python3.6/subprocess.py", line
707,
in __init__
restore_signals, start_new_session)
File "/home/dataetl/anaconda3/lib/python3.6/subprocess.py", line
1326, in _execute_child
raise child_exception_type(errno_num, err_msg)
FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit'
以下是我的问题:
我应该在我的气流机上安装spark hadoop吗?我这么问是因为在这篇文章里我读到了我需要复制的东西 hdfs-site.xml
以及 hive-site.xml
. 但你可以想象,我两者都没有 /etc/hadoop/
也不是 /etc/hive/
在我的气流机上。
a) 如果没有,我应该在哪里复制 hdfs-site.xml
以及 hive-site.xml
在我的通风机上?
b) 如果是,是否意味着我需要将气流机配置为客户端?一种不参与作业但可用于提交操作的边缘节点?
那么,我能 spark-submit
从我的通风机里?如果是的话,那么我就不需要像对mysql数据库那样在airflow上创建连接了,对吧?
哦,还有蛋糕上的樱桃:我能把我的Pypark脚本存储在我的气流机里吗 spark-submit
它们来自同一台气流机。太棒了!
任何评论都是非常有用的,即使你不能回答我所有的问题。。。
无论如何,还是要提前感谢!:)
1条答案
按热度按时间z0qdvdin1#
回答你的第一个问题,是的,这是一个很好的做法。
如何使用
SparkSubmitOperator
,请参考我的答案https://stackoverflow.com/a/53344713/5691525是的,你需要空气流动机器上的Spark二进制文件。
对
否->您仍然需要一个连接来告诉airflow您在哪里安装了spark二进制文件。类似https://stackoverflow.com/a/50541640/5691525
应该有用