从数据流管道选项获取独立参数字符串

n9vozmp4  于 2021-08-25  发布在  Java
关注(0)|答案(1)|浏览(348)

我正在尝试从airflow触发一个google数据流作业,需要帮助从airflow发送一个字符串作为参数,该字符串可以在dataflow中读取并用作独立字符串。
这是我的密码 DataflowTemplateOperator 它将发送名为 secretCode :

  1. DataflowTemplateOperator(
  2. task_id=TASK_ID,
  3. job_name=JOB_NAME,
  4. template=TEMPLATE_PATH,
  5. parameters={
  6. "secretCode": "123456"
  7. },
  8. dag=dag
  9. )

我想看报纸 secretCode 从…起 PipelineOptions 作为 String 发送至以下地址: ParDo 但我不知道怎么做。该代码与的输入和输出无关 ParDo 班级。我只想把代码写到bigquery。

  1. val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
  2. "transform my data to table row",
  3. ParDo.of(DataToTableRow())
  4. )

我想写下从银行返回的密码 PipelineOptions 如我在下面代码中所示,我不知道如何到达bigquery:

  1. class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() {
  2. @ProcessElement
  3. fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
  4. outputReceiver.output(getTableRow(myData))
  5. }
  6. private fun getTableRow(myData: myCustomDataStructure): TableRow {
  7. return TableRow().set("ID", myData.id)
  8. .set("SecretCode", secretCode)
  9. }
  10. }

对于如何解决这个问题,我将不胜感激。提前谢谢。

ndh0cuux

ndh0cuux1#

您需要创建自己的接口来扩展pipelineoptions,并在此处设置参数。

  1. public interface SecretOptions extends PipelineOptions {
  2. String getSecretCode();
  3. void setSecretCode(String secretCode);
  4. }

然后,在管道上注册接口,如下所示:

  1. PipelineOptionsFactory.register(SecretOptions.class);
  2. SecretOptions options = PipelineOptionsFactory.fromArgs(args)
  3. .withValidation()
  4. .as(SecretOptions.class);

然后,您就可以使用 options.getSecretCode(); 有关文档的更多信息

相关问题