使用dag运行创建emr集群,一旦任务完成,emr将被终止

eni9jsuy  于 2021-05-27  发布在  Hadoop
关注(0)|答案(3)|浏览(485)

我有气流作业,在emr集群上运行良好。我需要的是,假设我有4个气流作业需要一个emr集群,比如说20分钟来完成任务。为什么我们不能在dag运行时创建emr集群,一旦作业完成,它将终止创建的emr集群。

blpfk2vs

blpfk2vs1#

当然,这将是最有效地利用资源。让我警告你:这里面有很多细节;我尽量列出能让你走的路。我鼓励您添加自己的综合答案,列出您遇到的任何问题和解决方法(一旦您完成此操作)
关于集群创建/终止
对于集群的创建和终止,您必须 EmrCreateJobFlowOperator 以及 EmrTerminateJobFlowOperator 分别地
如果你不使用
AWS SecretAccessKey (完全依靠 IAM 角色);示例化任何 AWS -相关 hook 或者 operatorAirflow 会自动回落到底层 EC2 已附加 IAM 角色
如果您没有使用emr steps api提交作业,那么您还必须使用 Sensors . 已经有一个传感器用于轮询创建阶段,名为 EmrJobFlowSensor 你也可以稍微修改它来创建一个终端传感器
将集群配置json传入 job_flow_extra . 也可以在 Connection 的(像 my_emr_conn ) extra 但不要这样做,因为它经常破裂 SQLAlchemy orm加载(因为它是一个大的 json )
关于作业提交
你要么将作业提交给 Emr 使用emr steps api,可以在集群创建阶段(在集群配置json中)或之后使用 add_job_flow_steps() . 甚至还有一个 emr_add_steps_operator()Airflow 这也需要一个 EmrStepSensor . 你可以阅读更多关于它的文章 AWS 文档,您可能还必须使用 command-runner.jar 对于应用程序特定的情况(如 Hive , Livy ),你可以用他们具体的方式。例如,您可以使用 HiveServer2Hook 提交 Hive 工作。这里有一个棘手的部分:问题 run_job_flow() 调用(在集群创建阶段进行)只提供 job_flow_id (群集id)。你得用一个 describe_cluster() 呼叫使用 EmrHook 获取主节点的私有ip。使用它,您将能够以编程方式创建 Connection (例如 Hive Server 2 Thrift 连接),并使用它将计算提交到集群。在完成工作流程之前,不要忘记删除这些连接(为了美观)。
最后还有一个与集群交互的好方法。为此,你还应该通过 EC2 集群创建阶段的密钥对。之后,您可以通过编程方式创建 SSH 连接并使用它(与 SSHHook 或者 SSHOperator )用于在群集上运行作业。阅读更多关于ssh的内容 Airflow 在这里
特别是提交 Spark 作业到远程 Emr cluster,读这个讨论

u0njafvf

u0njafvf2#

检查我的实现,dag将创建emr集群,并对s3中的数据运行spark作业,完成后自动终止。
https://beyondexperiment.com/vijayravichandran06/aws-emr-orchestrate-with-airflow/

eqfvzcg8

eqfvzcg83#

最好的方法可能是在气流dag的根节点上有一个节点创建emr簇,然后在dag的最末端有另一个节点在所有其他节点完成后向下旋转簇。

相关问题