cz.seznam.euphoria.core.client.operator.FlatMap类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(248)

本文整理了Java中cz.seznam.euphoria.core.client.operator.FlatMap类的一些代码示例,展示了FlatMap类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMap类的具体详情如下:
包路径:cz.seznam.euphoria.core.client.operator.FlatMap
类名称:FlatMap

FlatMap介绍

[英]A transformation of a dataset from one type into another allowing user code to generate zero, one, or many output elements for a given input element.

The user supplied map function is supposed to be stateless. It is fed items from the input in no specified order and the results of the map function are "flattened" to the output (equally in no specified order.)

Example:

Dataset strings = ...;catch (NumberFormatException e)  
// ~ ignore the input if we failed to parse it 
} 
}) 
.output(); 
}

The above example tries to parse incoming strings as integers, silently skipping those which cannot be successfully converted. While Collector#collect(Object) has been used only once here, a FlatMap operator is free to invoke it multiple times or not at all to generate that many elements to the output dataset.

Builders:

  1. [named] .................. give name to the operator [optional]
  2. of ....................... input dataset
  3. using .................... apply UnaryFunctor to input elements
  4. [eventTimeBy] ............ change event time characteristic of output elements using ExtractEventTime
  5. output ................... build output dataset
    [中]数据集从一种类型到另一种类型的转换,允许用户代码为给定的输入元素生成零个、一个或多个输出元素。
    用户提供的映射函数应该是无状态的。它以不指定的顺序从输入中输入项目,映射函数的结果“展平”到输出(同样以不指定的顺序)
    例子:
Dataset strings = ...;catch (NumberFormatException e)  
// ~ ignore the input if we failed to parse it 
} 
}) 
.output(); 
}

上面的示例尝试将传入字符串解析为整数,悄悄地跳过那些无法成功转换的字符串。虽然收集器#collect(Object)在这里只使用过一次,但FlatMap操作符可以自由地多次调用它,或者根本不调用它来为输出数据集生成那么多元素。
####建筑商:
1.[命名]。。。。。。。。。。。。。。。。。。为操作员命名[可选]
1.共。。。。。。。。。。。。。。。。。。。。。。。输入数据集
1.使用。。。。。。。。。。。。。。。。。。。。将UnaryFunctor应用于输入元素
1.[eventTimeBy]。。。。。。。。。。。。使用ExtractEventTime更改输出元素的事件时间特性
1.产出。。。。。。。。。。。。。。。。。。。生成输出数据集

代码示例

代码示例来源:origin: seznam/euphoria

public <S> Dataset<S> flatMap(UnaryFunctor<T, S> f) {
 return new Dataset<>(FlatMap.of(this.wrap).using(requireNonNull(f)).output());
}

代码示例来源:origin: seznam/euphoria

@Test
public void testBuild() {
 Flow flow = Flow.create("TEST");
 Dataset<String> dataset = Util.createMockDataset(flow, 1);
 Dataset<String> mapped = FlatMap.named("FlatMap1")
   .of(dataset)
   .using((String s, Collector<String> c) -> c.collect(s))
   .output();
 assertEquals(flow, mapped.getFlow());
 assertEquals(1, flow.size());
 FlatMap map = (FlatMap) flow.operators().iterator().next();
 assertEquals(flow, map.getFlow());
 assertEquals("FlatMap1", map.getName());
 assertNotNull(map.getFunctor());
 assertEquals(mapped, map.output());
 assertNull(map.getEventTimeExtractor());
}

代码示例来源:origin: seznam/euphoria

@Override
 public Dataset<OUT> output(OutputHint... outputHints) {
  Flow flow = input.getFlow();
  FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn,
    Sets.newHashSet(outputHints));
  flow.add(map);
  return map.output();
 }
}

代码示例来源:origin: seznam/euphoria

@Override
 @SuppressWarnings("unchecked")
 public JavaRDD<?> translate(FlatMap operator, SparkExecutorContext context) {

  final JavaRDD<?> input = context.getSingleInput(operator);
  final UnaryFunctor<?, ?> mapper = operator.getFunctor();
  final ExtractEventTime<?> evtTimeFn = operator.getEventTimeExtractor();

  LazyAccumulatorProvider accumulators =
    new LazyAccumulatorProvider(context.getAccumulatorFactory(), context.getSettings());
  if (evtTimeFn != null) {
   return input
     .flatMap(new EventTimeAssigningUnaryFunctor(mapper, evtTimeFn, accumulators))
     .setName(operator.getName() + "::event-time-and-apply-udf");
  } else {
   return input
     .flatMap(new UnaryFunctorWrapper(mapper, accumulators))
     .setName(operator.getName() + "::apply-udf");
  }
 }
}

代码示例来源:origin: seznam/euphoria

new KafkaSource(uri.getAuthority(),
    uri.getPath().substring(1), settings));
return FlatMap.of(input)
  .using(new UnaryFunctor<Pair<byte[], byte[]>, Pair<Long, String>>() {
   private final SearchEventsParser parser = new SearchEventsParser();
DataSource<String> datasource = new SimpleHadoopTextFileSource(uri.toString());
Dataset<String> in = flow.createInput(datasource);
return FlatMap.named("PARSE-INPUT")
  .of(in)
  .using(new UnaryFunctor<String, Pair<Long, String>>() {

代码示例来源:origin: seznam/euphoria

flatMap.getSingleParentOrNull().get(), flatMap.get());
InputProvider ret = new InputProvider();
final UnaryFunctor mapper = flatMap.get().getFunctor();
final ExtractEventTime eventTimeFn = flatMap.get().getEventTimeExtractor();
for (Supplier s : suppliers) {
 final BlockingQueue<Datum> out = new ArrayBlockingQueue(5000);

代码示例来源:origin: seznam/euphoria

@Test
public void testBuild_ImplicitName() {
 Flow flow = Flow.create("TEST");
 Dataset<String> dataset = Util.createMockDataset(flow, 1);
 Dataset<String> mapped = FlatMap.of(dataset)
     .using((String s, Collector<String> c) -> c.collect(s))
     .output();
 FlatMap map = (FlatMap) flow.operators().iterator().next();
 assertEquals("FlatMap", map.getName());
}

代码示例来源:origin: seznam/euphoria

/**
 * Collects Avro record as JSON string
 *
 * @param outSink
 * @param inSource
 * @throws Exception
 */
public static void runFlow(
  DataSink<String> outSink,
  DataSource<Pair<AvroKey<GenericData.Record>, NullWritable>> inSource)
  throws Exception {
 Flow flow = Flow.create("simple read avro");
 Dataset<Pair<AvroKey<GenericData.Record>, NullWritable>> input = flow.createInput(inSource);
 final Dataset<String> output =
   FlatMap.named("avro2csv").of(input).using(AvroSourceTest::apply).output();
 output.persist(outSink);
 Executor executor = new LocalExecutor();
 executor.submit(flow).get();
}

代码示例来源:origin: seznam/euphoria

@Override
public DAG<Operator<?, ?>> getBasicOps() {
 return DAG.of(new FlatMap<>(
   getName(), getFlow(), input,
   (i, c) -> c.collect(i), eventTimeFn, getHints()));
}

代码示例来源:origin: seznam/euphoria

flatMap.getSingleParentOrNull().get(), flatMap.get());
InputProvider ret = new InputProvider();
final UnaryFunctor mapper = flatMap.get().getFunctor();
final ExtractEventTime eventTimeFn = flatMap.get().getEventTimeExtractor();
for (Supplier s : suppliers) {
 final BlockingQueue<Datum> out = new ArrayBlockingQueue(5000);

代码示例来源:origin: seznam/euphoria

return FlatMap.named("FORMAT-OUTPUT")
  .of(aggregated)
  .using(((Pair<String, Long> elem, Collector<String> context) -> {

代码示例来源:origin: seznam/euphoria

@Override
public DAG<Operator<?, ?>> getBasicOps() {
 return DAG.of(new FlatMap<>(
   getName(), getFlow(), input,
   (i, c) -> c.collect(i), eventTimeFn, getHints()));
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
 return FlatMap.of(input)
   .using((Integer e, Collector<Integer> c) -> {
    for (int i = 1; i <= e; i++) {
     c.collect(i);
    }
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Test
public void testBuild_EventTimeExtractor() {
 Flow flow = Flow.create("TEST");
 Dataset<String> dataset = Util.createMockDataset(flow, 1);
 Dataset<BigDecimal> mapped = FlatMap.named("FlatMap2")
   .of(dataset)
   .using((String s, Collector<BigDecimal> c) -> c.collect(null))
   .eventTimeBy(Long::parseLong) // ~ consuming the original input elements
   .output();
 assertEquals(flow, mapped.getFlow());
 assertEquals(1, flow.size());
 FlatMap map = (FlatMap) flow.operators().iterator().next();
 assertEquals(flow, map.getFlow());
 assertEquals("FlatMap2", map.getName());
 assertNotNull(map.getFunctor());
 assertEquals(mapped, map.output());
 assertNotNull(map.getEventTimeExtractor());
}

代码示例来源:origin: seznam/euphoria

@Override
 @SuppressWarnings("unchecked")
 public DataStream<?> translate(FlinkOperator<FlatMap> operator,
                 StreamingExecutorContext context) {
  Settings settings = context.getSettings();
  FlinkAccumulatorFactory accumulatorFactory = context.getAccumulatorFactory();

  DataStream input = context.getSingleInputStream(operator);
  UnaryFunctor mapper = operator.getOriginalOperator().getFunctor();
  ExtractEventTime evtTimeFn = operator.getOriginalOperator().getEventTimeExtractor();
  if (evtTimeFn != null) {
   input = input.assignTimestampsAndWatermarks(
     new EventTimeAssigner(context.getAllowedLateness(), evtTimeFn))
     .returns((Class) StreamingElement.class);
  }
  return input
    .flatMap(new StreamingUnaryFunctorWrapper(mapper, accumulatorFactory, settings))
    .returns((Class) StreamingElement.class)
    .name(operator.getName())
    .setParallelism(operator.getParallelism());
 }
}

代码示例来源:origin: seznam/euphoria

@Override
 public Dataset<OUT> output(OutputHint... outputHints) {
  Flow flow = input.getFlow();
  FlatMap<IN, OUT> map = new FlatMap<>(name, flow, input, functor, evtTimeFn,
    Sets.newHashSet(outputHints));
  flow.add(map);
  return map.output();
 }
}

代码示例来源:origin: seznam/euphoria

final Dataset<String> words = FlatMap.named("TOKENIZER")
    .of(lines)
    .using((String line, Collector<String> c) ->

代码示例来源:origin: seznam/euphoria

/** This operator can be implemented using FlatMap. */
 @Override
 public DAG<Operator<?, ?>> getBasicOps() {
  return DAG.of(new FlatMap<>(getName(), getFlow(), input,
    (elem, collector) -> {
     if (predicate.apply(elem)) {
      collector.collect(elem);
     }
    },
    null,
    getHints()));
 }
}

代码示例来源:origin: seznam/euphoria

@Override
protected Dataset<Integer> getOutput(Dataset<Integer> input) {
 return FlatMap.of(input)
   .using((Integer e, Collector<Integer> c) -> {
    for (int i = 1; i <= e; i++) {
     c.collect(i);
    }
   })
   .output();
}

代码示例来源:origin: seznam/euphoria

@Test
public void testBuild_WithCounters() {
 Flow flow = Flow.create("TEST");
 Dataset<String> dataset = Util.createMockDataset(flow, 1);
 Dataset<String> mapped = FlatMap.named("FlatMap1")
     .of(dataset)
     .using((String s, Collector<String> c) -> {
      c.getCounter("my-counter").increment();
      c.collect(s);
     })
     .output();
 assertEquals(flow, mapped.getFlow());
 assertEquals(1, flow.size());
 FlatMap map = (FlatMap) flow.operators().iterator().next();
 assertEquals(flow, map.getFlow());
 assertEquals("FlatMap1", map.getName());
 assertNotNull(map.getFunctor());
 assertEquals(mapped, map.output());
}

相关文章