python 如何获取气流DataFlowJavaOperator()的作业ID或结果?

webghufk  于 2023-04-19  发布在  Python
关注(0)|答案(2)|浏览(117)

我在airflow(Cloud Composer)中使用DataFlowJavaOperator(),有没有办法在下一个PythonOperator任务中获取已执行的数据流作业的ID?我想使用job_id调用gcloud命令获取作业结果。

def check_dataflow(ds, **kwargs)
  # here I want to execute gloud command with the job ID to get job result.
  # gcloud dataflow jobs describe <JOB_ID>

t1 = DataFlowJavaOperator(
    task_id='task1'
    jar='gs://path/to/jar/abc.jar',
    options={
        'stagingLocation': "gs://stgLocation/",
        'tempLocation': "gs://tmpLocation/",
    },
    provide_context=True
    dag=dag,
 )

t2 = PythonOperator(
    task_id='task2',
    python_callable=check_dataflow,
    provide_context=True
    dag=dag,
)

t1 >> t2
5w9g7ksd

5w9g7ksd1#

我也遇到了同样的问题,我终于找到了一个合适的解决方案。首先,DataFlowJavaOperator现在不推荐使用,应该替换为BeamRunJavaPipelineOperator
使用BeamRunJavaPipelineOperator需要安装apache-airflow-providers-apache-beam python包。
我的Airflow DAG并没有真正改变,您只需要将其迁移到BeamRunJavaPipelineOperator即可。

start_dataflow = BeamRunJavaPipelineOperator(
    task_id="start_dataflow",
    jar="gs://path/to/jar/abc.jar",
    pipeline_options={
        "gcpTempLocation": "gs://tmpLocation/"
    },
    job_class="",
    runner=BeamRunnerType.DataflowRunner,
    dataflow_config={
        "job_name": "my-dataflow-job"
    },
    dag=dag
)

bash_dataflow_job_id = BashOperator(
    task_id="bash_dataflow_job_id",
    bash_command="echo {{task_instance.xcom_pull('start_dataflow')['dataflow_job_id']}}",
    dag=dag
)

现在是最重要的变化。你写的最后一行stdout需要遵循这个正则表达式模式:Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]
在这里你可以找到源代码:https://github.com/apache/airflow/blob/553fb1f2ed0dfd177b11e3cc5232b964360fac8b/airflow/providers/google/cloud/hooks/dataflow.py#L57
我的Java解决方案看起来像这样:

PipelineResult pipelineResult = pipeline.run(pipelineOptions);

String jobId;
Class<? extends PipelineRunner<?>> runner = pipelineOptions.getRunner();
if (runner.isAssignableFrom(DataflowRunner.class)) {
    jobId = ((DataflowPipelineJob) pipelineResult).getJobId();
} else {
    jobId = pipelineOptions.getJobName();
}

// Needs to be the last console output for Airflow to receive the Dataflow Job ID
System.out.println("Submitted job: " + jobId);
jgwigjjp

jgwigjjp2#

如图所示,DataFlowJavaOperator中的job_name选项被task_id覆盖。作业名称将以任务作为前缀,并附加一个随机ID后缀。如果您仍然希望Dataflow作业名称实际上与任务ID不同,则可以在Dataflow Java代码中添加硬编码:

options.setJobName("jobNameInCode")

然后,使用PythonOperator,您可以从前缀(代码中提供的作业名称或Composer任务ID)中检索作业ID,如我所解释的here。简单地说,使用以下命令列出作业:

result = dataflow.projects().locations().jobs().list(
  projectId=project,
  location=location,
).execute()

然后按前缀过滤,其中job_prefix是启动作业时定义的job_name

for job in result['jobs']:
  if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
    job_id = job['id']
    break

break语句的作用是确保我们只获取具有该名称的最新作业,也就是刚刚启动的作业。

相关问题