我在airflow(Cloud Composer)中使用DataFlowJavaOperator(),有没有办法在下一个PythonOperator任务中获取已执行的数据流作业的ID?我想使用job_id调用gcloud
命令获取作业结果。
def check_dataflow(ds, **kwargs)
# here I want to execute gloud command with the job ID to get job result.
# gcloud dataflow jobs describe <JOB_ID>
t1 = DataFlowJavaOperator(
task_id='task1'
jar='gs://path/to/jar/abc.jar',
options={
'stagingLocation': "gs://stgLocation/",
'tempLocation': "gs://tmpLocation/",
},
provide_context=True
dag=dag,
)
t2 = PythonOperator(
task_id='task2',
python_callable=check_dataflow,
provide_context=True
dag=dag,
)
t1 >> t2
2条答案
按热度按时间5w9g7ksd1#
我也遇到了同样的问题,我终于找到了一个合适的解决方案。首先,DataFlowJavaOperator现在不推荐使用,应该替换为BeamRunJavaPipelineOperator。
使用BeamRunJavaPipelineOperator需要安装
apache-airflow-providers-apache-beam
python包。我的Airflow DAG并没有真正改变,您只需要将其迁移到BeamRunJavaPipelineOperator即可。
现在是最重要的变化。你写的最后一行stdout需要遵循这个正则表达式模式:
Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]
在这里你可以找到源代码:https://github.com/apache/airflow/blob/553fb1f2ed0dfd177b11e3cc5232b964360fac8b/airflow/providers/google/cloud/hooks/dataflow.py#L57
我的Java解决方案看起来像这样:
jgwigjjp2#
如图所示,
DataFlowJavaOperator
中的job_name
选项被task_id
覆盖。作业名称将以任务作为前缀,并附加一个随机ID后缀。如果您仍然希望Dataflow作业名称实际上与任务ID不同,则可以在Dataflow Java代码中添加硬编码:然后,使用
PythonOperator
,您可以从前缀(代码中提供的作业名称或Composer任务ID)中检索作业ID,如我所解释的here。简单地说,使用以下命令列出作业:然后按前缀过滤,其中
job_prefix
是启动作业时定义的job_name
:break语句的作用是确保我们只获取具有该名称的最新作业,也就是刚刚启动的作业。