org.apache.beam.sdk.transforms.Flatten类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(225)

本文整理了Java中org.apache.beam.sdk.transforms.Flatten类的一些代码示例,展示了Flatten类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flatten类的具体详情如下:
包路径:org.apache.beam.sdk.transforms.Flatten
类名称:Flatten

Flatten介绍

[英]Flatten takes multiple PCollections bundled into a PCollectionList and returns a single PCollection containing all the elements in all the input PCollections. The name "Flatten" suggests taking a list of lists and flattening them into a single list.

Example of use:

PCollection pc1 = ...;

By default, the Coder of the output PCollection is the same as the Coder of the first PCollection in the input PCollectionList (if the PCollectionList is non-empty).
[中]展平将多个PCollection捆绑到一个PCollectionList中,并返回一个包含所有输入PCollection中所有元素的PCollection。“扁平化”这个名称意味着将一系列列表扁平化为一个列表。
使用示例:

PCollection pc1 = ...;

默认情况下,输出PCollection的编码器与输入PCollectionList中第一个PCollection的编码器相同(如果PCollectionList为非空)。

代码示例

代码示例来源:origin: gojektech/feast

@Override
 public PCollection<FeatureRowExtended> expand(PCollectionTuple tuple) {
  List<PCollection<FeatureRowExtended>> outputList = Lists.newArrayList();
  for (TupleTag<FeatureRowExtended> tag : transforms.keySet()) {
   Write write = transforms.get(tag);
   Preconditions.checkNotNull(write, String.format("Null transform for tag=%s", tag.getId()));
   PCollection<FeatureRowExtended> input = tuple.get(tag);
   input.apply(String.format("Write to %s", tag.getId()), write);
   outputList.add(input);
  }
  // FeatureRows with no matching write transform end up in `input.get(mainTag)` and considered
  // discardible, we return them in the main output so they are considered written, but don't
  // actually write them to any store.
  outputList.add(tuple.get(mainTag));
  return PCollectionList.of(outputList).apply("Flatten main", Flatten.pCollections());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
public void testFlattenGetName() {
 Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
 Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public PCollection<T> expand(PCollection<T> in) {
 return in.apply(Combine.globally(new SampleAnyCombineFn<T>(limit)).withoutDefaults())
   .apply(Flatten.iterables());
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Test
public void emptyFlattenWithNonFlatten() {
 AppliedPTransform application =
   AppliedPTransform
     .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
       "EmptyFlatten",
       Collections.emptyMap(),
       Collections.singletonMap(
         new TupleTag<Integer>(),
         PCollection.createPrimitiveOutputInternal(
           p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
       /* This isn't actually possible to construct, but for the sake of example */
       Flatten.iterables(),
       p);
 assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Override
 public PCollection<T> expand(PCollectionList<T> input) {
  Map<PCollection<T>, Integer> instances = new HashMap<>();
  for (PCollection<T> pCollection : input.getAll()) {
   int existing = instances.get(pCollection) == null ? 0 : instances.get(pCollection);
   instances.put(pCollection, existing + 1);
  }
  PCollectionList<T> output = PCollectionList.empty(input.getPipeline());
  for (Map.Entry<PCollection<T>, Integer> instanceEntry : instances.entrySet()) {
   if (instanceEntry.getValue().equals(1)) {
    output = output.and(instanceEntry.getKey());
   } else {
    String duplicationName = String.format("Multiply %s", instanceEntry.getKey().getName());
    PCollection<T> duplicated =
      instanceEntry
        .getKey()
        .apply(duplicationName, ParDo.of(new DuplicateFn<>(instanceEntry.getValue())));
    output = output.and(duplicated);
   }
  }
  return output.apply(Flatten.pCollections());
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Test
public void flattenWithDuplicateInputsNonFlatten() {
 AppliedPTransform application =
   AppliedPTransform
     .<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
       "EmptyFlatten",
       Collections.emptyMap(),
       Collections.singletonMap(
         new TupleTag<Integer>(),
         PCollection.createPrimitiveOutputInternal(
           p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
       /* This isn't actually possible to construct, but for the sake of example */
       Flatten.iterables(),
       p);
 assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
}

代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10-examples

@Override
 public PCollection<KV<URI, String>> expand(PBegin input) {
  Pipeline pipeline = input.getPipeline();
  // Create one TextIO.Read transform for each document
  // and add its output to a PCollectionList
  PCollectionList<KV<URI, String>> urisToLines =
    PCollectionList.empty(pipeline);
  // TextIO.Read supports:
  //  - file: URIs and paths locally
  //  - gs: URIs on the service
  for (final URI uri : uris) {
   String uriString;
   if (uri.getScheme().equals("file")) {
    uriString = new File(uri).getPath();
   } else {
    uriString = uri.toString();
   }
   PCollection<KV<URI, String>> oneUriToLines = pipeline
     .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
     .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
   urisToLines = urisToLines.and(oneUriToLines);
  }
  return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<T> expand(PCollection<T> input) {
  return input
    .apply(WithKeys.of(""))
    .apply(GroupByKey.create())
    .apply(Values.create())
    .apply(Flatten.iterables());
 }
}

代码示例来源:origin: org.apache.beam/beam-examples-java

@Override
 public PCollection<KV<URI, String>> expand(PBegin input) {
  Pipeline pipeline = input.getPipeline();
  // Create one TextIO.Read transform for each document
  // and add its output to a PCollectionList
  PCollectionList<KV<URI, String>> urisToLines = PCollectionList.empty(pipeline);
  // TextIO.Read supports:
  //  - file: URIs and paths locally
  //  - gs: URIs on the service
  for (final URI uri : uris) {
   String uriString;
   if ("file".equals(uri.getScheme())) {
    uriString = new File(uri).getPath();
   } else {
    uriString = uri.toString();
   }
   PCollection<KV<URI, String>> oneUriToLines =
     pipeline
       .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
       .apply("WithKeys(" + uriString + ")", WithKeys.of(uri))
       .setCoder(KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()));
   urisToLines = urisToLines.and(oneUriToLines);
  }
  return urisToLines.apply(Flatten.pCollections());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollectionView<Integer> expand(PCollection<String> input) {
  return input
    .apply(
      ParDo.of(
        new DoFn<String, Integer>() {
         @ProcessElement
         public void toInteger(ProcessContext ctxt) {
          ctxt.output(Integer.valueOf(ctxt.element()));
         }
        }))
    .apply(Top.largest(1))
    .apply(Flatten.iterables())
    .apply(View.asSingleton());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollection<T> expand(PBegin input) {
  PCollectionList<T> empty = PCollectionList.empty(input.getPipeline());
  return empty.apply(Flatten.pCollections());
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
 public PCollectionView<ActualT> expand(PBegin input) {
  final Coder<T> coder = actual.getCoder();
  return actual
    .apply("FilterActuals", rewindowActuals.prepareActuals())
    .apply("GatherPanes", GatherAllPanes.globally())
    .apply("ExtractPane", MapElements.via(extractPane))
    .setCoder(IterableCoder.of(actual.getCoder()))
    .apply(Flatten.iterables())
    .apply("RewindowActuals", rewindowActuals.windowActuals())
    .apply(
      ParDo.of(
        new DoFn<T, T>() {
         @ProcessElement
         public void processElement(ProcessContext context) throws CoderException {
          context.output(CoderUtils.clone(coder, context.element()));
         }
        }))
    .apply(actualView);
 }
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Test
public void getInputEmptySucceeds() {
 PTransformReplacement<PCollectionList<Long>, PCollection<Long>> replacement =
   factory.getReplacementTransform(
     AppliedPTransform.of(
       "nonEmptyInput",
       Collections.emptyMap(),
       Collections.emptyMap(),
       Flatten.pCollections(),
       pipeline));
 assertThat(replacement.getInput().getAll(), emptyIterable());
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testFlattenIterables() {
 PCollection<Iterable<String>> input =
   p.apply(
     Create.<Iterable<String>>of(LINES).withCoder(IterableCoder.of(StringUtf8Coder.of())));
 PCollection<String> output = input.apply(Flatten.iterables());
 PAssert.that(output).containsInAnyOrder(LINES_ARRAY);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java

@Test
public void emptyFlattenWithEmptyFlatten() {
 AppliedPTransform application =
   AppliedPTransform.of(
     "EmptyFlatten",
     Collections.emptyMap(),
     Collections.singletonMap(
       new TupleTag<Integer>(),
       PCollection.createPrimitiveOutputInternal(
         p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
     Flatten.pCollections(),
     p);
 assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testFlattenIterablesEmpty() {
 PCollection<Iterable<String>> input =
   p.apply(
     Create.<Iterable<String>>of(NO_LINES)
       .withCoder(IterableCoder.of(StringUtf8Coder.of())));
 PCollection<String> output = input.apply(Flatten.iterables());
 PAssert.that(output).containsInAnyOrder(NO_LINES_ARRAY);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Override
public PCollection<T> expand(PCollection<T> input) {
 applicableTo(input);
 WindowingStrategy<?, ?> outputStrategy =
   getOutputStrategyInternal(input.getWindowingStrategy());
 if (getWindowFn() == null) {
  // A new PCollection must be created in case input is reused in a different location as the
  // two PCollections will, in general, have a different windowing strategy.
  return PCollectionList.of(input)
    .apply(Flatten.pCollections())
    .setWindowingStrategyInternal(outputStrategy);
 } else {
  // This is the AssignWindows primitive
  return input.apply(new Assign<>(this, outputStrategy));
 }
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testFlattenIterablesLists() {
 PCollection<List<String>> input =
   p.apply(Create.<List<String>>of(LINES).withCoder(ListCoder.of(StringUtf8Coder.of())));
 PCollection<String> output = input.apply(Flatten.iterables());
 PAssert.that(output).containsInAnyOrder(LINES_ARRAY);
 p.run();
}

代码示例来源:origin: org.apache.beam/beam-runners-apex

@Test
public void test() throws Exception {
 ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
 options.setRunner(ApexRunner.class);
 Pipeline p = Pipeline.create(options);
 String[][] collections = {{"1"}, {"2"}, {"3"}, {"4"}, {"5"}};
 Set<String> expected = Sets.newHashSet();
 List<PCollection<String>> pcList = new ArrayList<>();
 for (String[] collection : collections) {
  pcList.add(
    p.apply(Create.of(ImmutableList.copyOf(collection)).withCoder(StringUtf8Coder.of())));
  expected.addAll(Arrays.asList(collection));
 }
 PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.pCollections());
 actual.apply(ParDo.of(new EmbeddedCollector()));
 ApexRunnerResult result = (ApexRunnerResult) p.run();
 // TODO: verify translation
 result.getApexDAG();
 long timeout = System.currentTimeMillis() + 30000;
 while (System.currentTimeMillis() < timeout
   && EmbeddedCollector.RESULTS.size() < expected.size()) {
  LOG.info("Waiting for expected results.");
  Thread.sleep(500);
 }
 Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size());
 Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
}

代码示例来源:origin: org.apache.beam/beam-sdks-java-core

@Test
@Category(ValidatesRunner.class)
public void testFlattenIterablesSets() {
 Set<String> linesSet = ImmutableSet.copyOf(LINES);
 PCollection<Set<String>> input =
   p.apply(Create.<Set<String>>of(linesSet).withCoder(SetCoder.of(StringUtf8Coder.of())));
 PCollection<String> output = input.apply(Flatten.iterables());
 PAssert.that(output).containsInAnyOrder(LINES_ARRAY);
 p.run();
}

相关文章