从数据流管道选项获取独立参数字符串

n9vozmp4  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(272)

我正在尝试从airflow触发一个google数据流作业,需要帮助从airflow发送一个字符串作为参数,该字符串可以在dataflow中读取并用作独立字符串。
这是我的密码 DataflowTemplateOperator 它将发送名为 secretCode :

DataflowTemplateOperator(
            task_id=TASK_ID,
            job_name=JOB_NAME,
            template=TEMPLATE_PATH,
            parameters={
                "secretCode": "123456"
            },
            dag=dag
        )

我想看报纸 secretCode 从…起 PipelineOptions 作为 String 发送至以下地址: ParDo 但我不知道怎么做。该代码与的输入和输出无关 ParDo 班级。我只想把代码写到bigquery。

val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
        "transform my data to table row",
        ParDo.of(DataToTableRow())
    )

我想写下从银行返回的密码 PipelineOptions 如我在下面代码中所示,我不知道如何到达bigquery:

class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() { 
      @ProcessElement
      fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
          outputReceiver.output(getTableRow(myData))
      }

      private fun getTableRow(myData: myCustomDataStructure): TableRow {
          return TableRow().set("ID", myData.id)
                           .set("SecretCode", secretCode)
      }
   }

对于如何解决这个问题,我将不胜感激。提前谢谢。

ndh0cuux

ndh0cuux1#

您需要创建自己的接口来扩展pipelineoptions,并在此处设置参数。

public interface SecretOptions extends PipelineOptions {
    String getSecretCode();
    void setSecretCode(String secretCode);
  }

然后,在管道上注册接口,如下所示:

PipelineOptionsFactory.register(SecretOptions.class);
SecretOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(SecretOptions.class);

然后,您就可以使用 options.getSecretCode(); 有关文档的更多信息

相关问题