apache beam-跳过管道步骤

9wbgstp7  于 2021-06-30  发布在  Java
关注(0)|答案(1)|浏览(347)

我使用apache beam建立了一个管道,包括两个主要步骤:
使用光束变换变换数据
将转换后的数据加载到bigquery
管道设置如下所示:

myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
                .apply("do a parallel transform"),
                     ParDo.of(new MyTransformClassName.MyTransformFn()));

 myPCollection
    .apply("Load BigQuery data for PCollection",
            BigQueryIO.<myCollectionObjectType>write()
            .to(new MyDataLoadClass.MyFactTableDestination(myDestination))
            .withFormatFunction(new MyDataLoadClass.MySerializationFn())

我看过这个问题:
apachebeam:跳过已构建管道中的步骤
这意味着,在步骤1中的并行转换之后,我可能能够以某种方式动态地更改可以将数据传递给哪个输出。
我该怎么做?我不知道如何选择是否通过 myPCollection 从步骤1到步骤2。我需要跳过第二步,如果对象在 myPCollection 从第一步开始 null .

ovfsdjhp

ovfsdjhp1#

你只是不从你的眼睛发射元素 MyTransformClassName.MyTransformFn 当您不想在下一步中使用它时,例如:

class MyTransformClassName.MyTransformFn extends...
  @ProcessElement
  public void processElement(ProcessContext c, ...) {
    ...
    result = ...
    if (result != null) {
       c.output(result);   //only output something that's not null
    }
  }

这样空值就不会到达下一步。
看到了吗 ParDo 有关更多详细信息,请参阅指南的第节:https://beam.apache.org/documentation/programming-guide/#pardo

相关问题