我有气流作业,在emr集群上运行良好。我需要的是,假设我有4个气流作业需要一个emr集群,比如说20分钟来完成任务。为什么我们不能在dag运行时创建emr集群,一旦作业完成,它将终止创建的emr集群。
blpfk2vs1#
当然,这将是最有效地利用资源。让我警告你:这里面有很多细节;我尽量列出能让你走的路。我鼓励您添加自己的综合答案,列出您遇到的任何问题和解决方法(一旦您完成此操作)关于集群创建/终止对于集群的创建和终止,您必须 EmrCreateJobFlowOperator 以及 EmrTerminateJobFlowOperator 分别地如果你不使用AWS SecretAccessKey (完全依靠 IAM 角色);示例化任何 AWS -相关 hook 或者 operator 在 Airflow 会自动回落到底层 EC2 已附加 IAM 角色如果您没有使用emr steps api提交作业,那么您还必须使用 Sensors . 已经有一个传感器用于轮询创建阶段,名为 EmrJobFlowSensor 你也可以稍微修改它来创建一个终端传感器将集群配置json传入 job_flow_extra . 也可以在 Connection 的(像 my_emr_conn ) extra 但不要这样做,因为它经常破裂 SQLAlchemy orm加载(因为它是一个大的 json )关于作业提交你要么将作业提交给 Emr 使用emr steps api,可以在集群创建阶段(在集群配置json中)或之后使用 add_job_flow_steps() . 甚至还有一个 emr_add_steps_operator() 在 Airflow 这也需要一个 EmrStepSensor . 你可以阅读更多关于它的文章 AWS 文档,您可能还必须使用 command-runner.jar 对于应用程序特定的情况(如 Hive , Livy ),你可以用他们具体的方式。例如,您可以使用 HiveServer2Hook 提交 Hive 工作。这里有一个棘手的部分:问题 run_job_flow() 调用(在集群创建阶段进行)只提供 job_flow_id (群集id)。你得用一个 describe_cluster() 呼叫使用 EmrHook 获取主节点的私有ip。使用它,您将能够以编程方式创建 Connection (例如 Hive Server 2 Thrift 连接),并使用它将计算提交到集群。在完成工作流程之前,不要忘记删除这些连接(为了美观)。最后还有一个与集群交互的好方法。为此,你还应该通过 EC2 集群创建阶段的密钥对。之后,您可以通过编程方式创建 SSH 连接并使用它(与 SSHHook 或者 SSHOperator )用于在群集上运行作业。阅读更多关于ssh的内容 Airflow 在这里特别是提交 Spark 作业到远程 Emr cluster,读这个讨论
EmrCreateJobFlowOperator
EmrTerminateJobFlowOperator
SecretAccessKey
IAM
AWS
hook
operator
Airflow
EC2
Sensors
EmrJobFlowSensor
job_flow_extra
Connection
my_emr_conn
extra
SQLAlchemy
json
Emr
add_job_flow_steps()
emr_add_steps_operator()
EmrStepSensor
command-runner.jar
Hive
Livy
HiveServer2Hook
run_job_flow()
job_flow_id
describe_cluster()
EmrHook
Hive Server 2 Thrift
SSH
SSHHook
SSHOperator
Spark
u0njafvf2#
检查我的实现,dag将创建emr集群,并对s3中的数据运行spark作业,完成后自动终止。https://beyondexperiment.com/vijayravichandran06/aws-emr-orchestrate-with-airflow/
eqfvzcg83#
最好的方法可能是在气流dag的根节点上有一个节点创建emr簇,然后在dag的最末端有另一个节点在所有其他节点完成后向下旋转簇。
3条答案
按热度按时间blpfk2vs1#
当然,这将是最有效地利用资源。让我警告你:这里面有很多细节;我尽量列出能让你走的路。我鼓励您添加自己的综合答案,列出您遇到的任何问题和解决方法(一旦您完成此操作)
关于集群创建/终止
对于集群的创建和终止,您必须
EmrCreateJobFlowOperator
以及EmrTerminateJobFlowOperator
分别地如果你不使用
AWS
SecretAccessKey
(完全依靠IAM
角色);示例化任何AWS
-相关hook
或者operator
在Airflow
会自动回落到底层EC2
已附加IAM
角色如果您没有使用emr steps api提交作业,那么您还必须使用
Sensors
. 已经有一个传感器用于轮询创建阶段,名为EmrJobFlowSensor
你也可以稍微修改它来创建一个终端传感器将集群配置json传入
job_flow_extra
. 也可以在Connection
的(像my_emr_conn
)extra
但不要这样做,因为它经常破裂SQLAlchemy
orm加载(因为它是一个大的json
)关于作业提交
你要么将作业提交给
Emr
使用emr steps api,可以在集群创建阶段(在集群配置json中)或之后使用add_job_flow_steps()
. 甚至还有一个emr_add_steps_operator()
在Airflow
这也需要一个EmrStepSensor
. 你可以阅读更多关于它的文章AWS
文档,您可能还必须使用command-runner.jar
对于应用程序特定的情况(如Hive
,Livy
),你可以用他们具体的方式。例如,您可以使用HiveServer2Hook
提交Hive
工作。这里有一个棘手的部分:问题run_job_flow()
调用(在集群创建阶段进行)只提供job_flow_id
(群集id)。你得用一个describe_cluster()
呼叫使用EmrHook
获取主节点的私有ip。使用它,您将能够以编程方式创建Connection
(例如Hive Server 2 Thrift
连接),并使用它将计算提交到集群。在完成工作流程之前,不要忘记删除这些连接(为了美观)。最后还有一个与集群交互的好方法。为此,你还应该通过
EC2
集群创建阶段的密钥对。之后,您可以通过编程方式创建SSH
连接并使用它(与SSHHook
或者SSHOperator
)用于在群集上运行作业。阅读更多关于ssh的内容Airflow
在这里特别是提交
Spark
作业到远程Emr
cluster,读这个讨论u0njafvf2#
检查我的实现,dag将创建emr集群,并对s3中的数据运行spark作业,完成后自动终止。
https://beyondexperiment.com/vijayravichandran06/aws-emr-orchestrate-with-airflow/
eqfvzcg83#
最好的方法可能是在气流dag的根节点上有一个节点创建emr簇,然后在dag的最末端有另一个节点在所有其他节点完成后向下旋转簇。