本文整理了Java中org.apache.beam.sdk.transforms.Flatten
类的一些代码示例,展示了Flatten
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flatten
类的具体详情如下:
包路径:org.apache.beam.sdk.transforms.Flatten
类名称:Flatten
[英]Flatten takes multiple PCollections bundled into a PCollectionList and returns a single PCollection containing all the elements in all the input PCollections. The name "Flatten" suggests taking a list of lists and flattening them into a single list.
Example of use:
PCollection pc1 = ...;
By default, the Coder of the output PCollection is the same as the Coder of the first PCollection in the input PCollectionList (if the PCollectionList is non-empty).
[中]展平将多个PCollection捆绑到一个PCollectionList中,并返回一个包含所有输入PCollection中所有元素的PCollection。“扁平化”这个名称意味着将一系列列表扁平化为一个列表。
使用示例:
PCollection pc1 = ...;
默认情况下,输出PCollection的编码器与输入PCollectionList中第一个PCollection的编码器相同(如果PCollectionList为非空)。
代码示例来源:origin: gojektech/feast
@Override
public PCollection<FeatureRowExtended> expand(PCollectionTuple tuple) {
List<PCollection<FeatureRowExtended>> outputList = Lists.newArrayList();
for (TupleTag<FeatureRowExtended> tag : transforms.keySet()) {
Write write = transforms.get(tag);
Preconditions.checkNotNull(write, String.format("Null transform for tag=%s", tag.getId()));
PCollection<FeatureRowExtended> input = tuple.get(tag);
input.apply(String.format("Write to %s", tag.getId()), write);
outputList.add(input);
}
// FeatureRows with no matching write transform end up in `input.get(mainTag)` and considered
// discardible, we return them in the main output so they are considered written, but don't
// actually write them to any store.
outputList.add(tuple.get(mainTag));
return PCollectionList.of(outputList).apply("Flatten main", Flatten.pCollections());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
public void testFlattenGetName() {
Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollection<T> expand(PCollection<T> in) {
return in.apply(Combine.globally(new SampleAnyCombineFn<T>(limit)).withoutDefaults())
.apply(Flatten.iterables());
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
@Test
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.emptyMap(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
/* This isn't actually possible to construct, but for the sake of example */
Flatten.iterables(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
@Override
public PCollection<T> expand(PCollectionList<T> input) {
Map<PCollection<T>, Integer> instances = new HashMap<>();
for (PCollection<T> pCollection : input.getAll()) {
int existing = instances.get(pCollection) == null ? 0 : instances.get(pCollection);
instances.put(pCollection, existing + 1);
}
PCollectionList<T> output = PCollectionList.empty(input.getPipeline());
for (Map.Entry<PCollection<T>, Integer> instanceEntry : instances.entrySet()) {
if (instanceEntry.getValue().equals(1)) {
output = output.and(instanceEntry.getKey());
} else {
String duplicationName = String.format("Multiply %s", instanceEntry.getKey().getName());
PCollection<T> duplicated =
instanceEntry
.getKey()
.apply(duplicationName, ParDo.of(new DuplicateFn<>(instanceEntry.getValue())));
output = output.and(duplicated);
}
}
return output.apply(Flatten.pCollections());
}
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
@Test
public void flattenWithDuplicateInputsNonFlatten() {
AppliedPTransform application =
AppliedPTransform
.<PCollection<Iterable<Integer>>, PCollection<Integer>, Flatten.Iterables<Integer>>of(
"EmptyFlatten",
Collections.emptyMap(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
/* This isn't actually possible to construct, but for the sake of example */
Flatten.iterables(),
p);
assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
}
代码示例来源:origin: org.apache.beam/beam-runners-flink_2.10-examples
@Override
public PCollection<KV<URI, String>> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each document
// and add its output to a PCollectionList
PCollectionList<KV<URI, String>> urisToLines =
PCollectionList.empty(pipeline);
// TextIO.Read supports:
// - file: URIs and paths locally
// - gs: URIs on the service
for (final URI uri : uris) {
String uriString;
if (uri.getScheme().equals("file")) {
uriString = new File(uri).getPath();
} else {
uriString = uri.toString();
}
PCollection<KV<URI, String>> oneUriToLines = pipeline
.apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
urisToLines = urisToLines.and(oneUriToLines);
}
return urisToLines.apply(Flatten.<KV<URI, String>>pCollections());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply(WithKeys.of(""))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables());
}
}
代码示例来源:origin: org.apache.beam/beam-examples-java
@Override
public PCollection<KV<URI, String>> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
// Create one TextIO.Read transform for each document
// and add its output to a PCollectionList
PCollectionList<KV<URI, String>> urisToLines = PCollectionList.empty(pipeline);
// TextIO.Read supports:
// - file: URIs and paths locally
// - gs: URIs on the service
for (final URI uri : uris) {
String uriString;
if ("file".equals(uri.getScheme())) {
uriString = new File(uri).getPath();
} else {
uriString = uri.toString();
}
PCollection<KV<URI, String>> oneUriToLines =
pipeline
.apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.of(uri))
.setCoder(KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()));
urisToLines = urisToLines.and(oneUriToLines);
}
return urisToLines.apply(Flatten.pCollections());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollectionView<Integer> expand(PCollection<String> input) {
return input
.apply(
ParDo.of(
new DoFn<String, Integer>() {
@ProcessElement
public void toInteger(ProcessContext ctxt) {
ctxt.output(Integer.valueOf(ctxt.element()));
}
}))
.apply(Top.largest(1))
.apply(Flatten.iterables())
.apply(View.asSingleton());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollection<T> expand(PBegin input) {
PCollectionList<T> empty = PCollectionList.empty(input.getPipeline());
return empty.apply(Flatten.pCollections());
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollectionView<ActualT> expand(PBegin input) {
final Coder<T> coder = actual.getCoder();
return actual
.apply("FilterActuals", rewindowActuals.prepareActuals())
.apply("GatherPanes", GatherAllPanes.globally())
.apply("ExtractPane", MapElements.via(extractPane))
.setCoder(IterableCoder.of(actual.getCoder()))
.apply(Flatten.iterables())
.apply("RewindowActuals", rewindowActuals.windowActuals())
.apply(
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
public void processElement(ProcessContext context) throws CoderException {
context.output(CoderUtils.clone(coder, context.element()));
}
}))
.apply(actualView);
}
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
@Test
public void getInputEmptySucceeds() {
PTransformReplacement<PCollectionList<Long>, PCollection<Long>> replacement =
factory.getReplacementTransform(
AppliedPTransform.of(
"nonEmptyInput",
Collections.emptyMap(),
Collections.emptyMap(),
Flatten.pCollections(),
pipeline));
assertThat(replacement.getInput().getAll(), emptyIterable());
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testFlattenIterables() {
PCollection<Iterable<String>> input =
p.apply(
Create.<Iterable<String>>of(LINES).withCoder(IterableCoder.of(StringUtf8Coder.of())));
PCollection<String> output = input.apply(Flatten.iterables());
PAssert.that(output).containsInAnyOrder(LINES_ARRAY);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-runners-core-construction-java
@Test
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform.of(
"EmptyFlatten",
Collections.emptyMap(),
Collections.singletonMap(
new TupleTag<Integer>(),
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, VarIntCoder.of())),
Flatten.pCollections(),
p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testFlattenIterablesEmpty() {
PCollection<Iterable<String>> input =
p.apply(
Create.<Iterable<String>>of(NO_LINES)
.withCoder(IterableCoder.of(StringUtf8Coder.of())));
PCollection<String> output = input.apply(Flatten.iterables());
PAssert.that(output).containsInAnyOrder(NO_LINES_ARRAY);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Override
public PCollection<T> expand(PCollection<T> input) {
applicableTo(input);
WindowingStrategy<?, ?> outputStrategy =
getOutputStrategyInternal(input.getWindowingStrategy());
if (getWindowFn() == null) {
// A new PCollection must be created in case input is reused in a different location as the
// two PCollections will, in general, have a different windowing strategy.
return PCollectionList.of(input)
.apply(Flatten.pCollections())
.setWindowingStrategyInternal(outputStrategy);
} else {
// This is the AssignWindows primitive
return input.apply(new Assign<>(this, outputStrategy));
}
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testFlattenIterablesLists() {
PCollection<List<String>> input =
p.apply(Create.<List<String>>of(LINES).withCoder(ListCoder.of(StringUtf8Coder.of())));
PCollection<String> output = input.apply(Flatten.iterables());
PAssert.that(output).containsInAnyOrder(LINES_ARRAY);
p.run();
}
代码示例来源:origin: org.apache.beam/beam-runners-apex
@Test
public void test() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
options.setRunner(ApexRunner.class);
Pipeline p = Pipeline.create(options);
String[][] collections = {{"1"}, {"2"}, {"3"}, {"4"}, {"5"}};
Set<String> expected = Sets.newHashSet();
List<PCollection<String>> pcList = new ArrayList<>();
for (String[] collection : collections) {
pcList.add(
p.apply(Create.of(ImmutableList.copyOf(collection)).withCoder(StringUtf8Coder.of())));
expected.addAll(Arrays.asList(collection));
}
PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.pCollections());
actual.apply(ParDo.of(new EmbeddedCollector()));
ApexRunnerResult result = (ApexRunnerResult) p.run();
// TODO: verify translation
result.getApexDAG();
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout
&& EmbeddedCollector.RESULTS.size() < expected.size()) {
LOG.info("Waiting for expected results.");
Thread.sleep(500);
}
Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size());
Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
}
代码示例来源:origin: org.apache.beam/beam-sdks-java-core
@Test
@Category(ValidatesRunner.class)
public void testFlattenIterablesSets() {
Set<String> linesSet = ImmutableSet.copyOf(LINES);
PCollection<Set<String>> input =
p.apply(Create.<Set<String>>of(linesSet).withCoder(SetCoder.of(StringUtf8Coder.of())));
PCollection<String> output = input.apply(Flatten.iterables());
PAssert.that(output).containsInAnyOrder(LINES_ARRAY);
p.run();
}
内容来源于网络,如有侵权,请联系作者删除!