我正在尝试从airflow触发一个google数据流作业,需要帮助从airflow发送一个字符串作为参数,该字符串可以在dataflow中读取并用作独立字符串。
这是我的密码 DataflowTemplateOperator
它将发送名为 secretCode
:
DataflowTemplateOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
"secretCode": "123456"
},
dag=dag
)
我想看报纸 secretCode
从…起 PipelineOptions
作为 String
发送至以下地址: ParDo
但我不知道怎么做。该代码与的输入和输出无关 ParDo
班级。我只想把代码写到bigquery。
val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
"transform my data to table row",
ParDo.of(DataToTableRow())
)
我想写下从银行返回的密码 PipelineOptions
如我在下面代码中所示,我不知道如何到达bigquery:
class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() {
@ProcessElement
fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
outputReceiver.output(getTableRow(myData))
}
private fun getTableRow(myData: myCustomDataStructure): TableRow {
return TableRow().set("ID", myData.id)
.set("SecretCode", secretCode)
}
}
对于如何解决这个问题,我将不胜感激。提前谢谢。
1条答案
按热度按时间ndh0cuux1#
您需要创建自己的接口来扩展pipelineoptions,并在此处设置参数。
然后,在管道上注册接口,如下所示:
然后,您就可以使用
options.getSecretCode();
有关文档的更多信息