java 如何在Airflow中运行Spark代码?

8yoxcaq7  于 2023-06-28  发布在  Java
关注(0)|答案(4)|浏览(179)

地球上的人们,你们好!我正在使用Airflow来调度和运行Spark任务。到目前为止,我只找到了Airflow可以管理的python DAG。
DAG示例:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

问题是我不擅长Python代码,有些任务是用Java写的。我的问题是如何在python DAG中运行Spark Java jar?或者你还有别的办法?我发现spark提交:http://spark.apache.org/docs/latest/submitting-applications.html
但我不知道怎么把所有的东西联系起来。也许有人以前用过它,并有工作的例子。感谢您的宝贵时间!

kxkpmulp

kxkpmulp1#

您应该可以使用BashOperator。保持代码的其余部分不变,导入所需的类和系统包:

from airflow.operators.bash_operator import BashOperator

import os
import sys

设置所需路径:

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

并添加运算符:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

您可以很容易地扩展它,使用Jinja模板提供额外的参数。
当然,您可以在非Spark场景中调整此设置,将bash_command替换为适合您的模板,例如:

bash_command = 'java -jar {{ params.jar }}'

调整params

13z8s7eq

13z8s7eq2#

Airflow 1.8版(今天发布)

SparkSQLHook代码-https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHook代码-https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
请注意,这两个新的Spark操作符/钩子在1.8版本的“contrib”分支中,因此没有(很好地)记录。
因此,您可以使用SparkSubmitOperator提交Java代码以供Spark执行。

t1qtbnec

t1qtbnec3#

在kubernetes(minikube示例)上,有一个在Spark 2.3.1中使用SparkSubmitOperator的例子:

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta

default_args = {
    'owner': 'user@mail.com',
    'depends_on_past': False,
    'start_date': datetime(2018, 7, 27),
    'email': ['user@mail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    'end_date': datetime(2018, 7, 29),
}

dag = DAG(
    'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job',
    conn_id='spark_default',
    java_class='com.ibm.cdopoc.DataLoaderDB2COS',
    application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
    total_executor_cores='1',
    executor_cores='1',
    executor_memory='2g',
    num_executors='2',
    name='airflowspark-DataLoaderDB2COS',
    verbose=True,
    driver_memory='1g',
    conf={
        'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
        'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
        'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
        'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
        'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
        'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
        'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
        'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
        'spark.COS_BUCKET': 'data-ingestion-poc',
        'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
        'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
        'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
        },
    dag=dag,
)

t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)

使用存储在Airflow变量中的变量的代码:

另外,您需要创建一个新的spark连接或使用额外的字典{"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}编辑现有的'spark_default':

cclgggtu

cclgggtu4#

在Airflow UI中进入Admin-> Connection-> Create。通过提供host = IP地址、port = 22和extra作为{"key_file": "/path/to/pem/file", "no_host_key_check":true}来创建新的SSH连接
这个主机应该是Spark集群主服务器,你可以从这里提交spark-jobs。接下来,您需要使用SSHOperator创建DAG。下面是这个的模板。

with DAG(dag_id='ssh-dag-id',
     schedule_interval="05 12 * * *",
     catchup=False) as dag:
     
spark_job = ("spark-submit --class fully.qualified.class.name "
             "--master yarn "
             "--deploy-mode client "
             "--driver-memory 6G "
             "--executor-memory 6G "
             "--num-executors 6 "
             "/path/to/your-spark.jar")

ssh_run_query = SSHOperator(
    task_id="random_task_id",
    ssh_conn_id="name_of_connection_you just_created",
    command=spark_job,
    get_pty=True,
    dag=dag)

ssh_run_query

就是这样。您还可以在Airflow中获得此Spark作业的完整日志。

相关问题