org.apache.flink.api.java.operators.UnsortedGrouping.sum()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(7.5k)|赞(0)|评价(0)|浏览(100)

本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping.sum()方法的一些代码示例,展示了UnsortedGrouping.sum()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping.sum()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping
方法名:sum

UnsortedGrouping.sum介绍

[英]Syntactic sugar for aggregate (SUM, field).
[中]聚合的语法糖(总和,字段)。

代码示例

代码示例来源:origin: apache/flink

/**
 * Return the degree of all vertices in the graph.
 *
 * @return A DataSet of {@code Tuple2<vertexId, degree>}
 */
public DataSet<Tuple2<K, LongValue>> getDegrees() {
  return outDegrees()
    .union(inDegrees()).name("In- and out-degree")
    .groupBy(0).sum(1).name("Sum");
}

代码示例来源:origin: apache/flink

private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
    env.readTextFile(inFile.getAbsolutePath())
      .flatMap(new Tokenizer())
      .groupBy(0)
      .sum(1)
      .writeAsCsv(outFile.getAbsolutePath());
    return env.createProgramPlan();
  }
}

代码示例来源:origin: apache/flink

.sum(1);

代码示例来源:origin: apache/flink

/**
 * Fix for FLINK-2019.
 *
 * @throws Exception
 */
@Test
public void testJodatimeDateTimeWithKryo() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new Tuple2<>(1, DateTime.now()));
  DataSet<Tuple2<Integer, DateTime>> reduceDs = ds.groupBy("f1").sum(0).project(0);
  List<Tuple2<Integer, DateTime>> result = reduceDs.collect();
  String expected = "1\n";
  compareResultAsTuples(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testClusterWithIPv6host() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    env.getConfig().disableSysoutLogging();
    // get input data
    DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
    DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String token : value.toLowerCase().split("\\W+")) {
              if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
              }
            }
          }
        })
        .groupBy(0).sum(1);
    List<Tuple2<String, Integer>> result = counts.collect();
    TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES);
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testGroupedAggregate() throws Exception {
  /*
   * Grouped Aggregate
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
      .sum(0)
      .project(1, 0);
  List<Tuple2<Long, Integer>> result = aggregateDs.collect();
  String expected = "1,1\n" +
      "2,5\n" +
      "3,15\n" +
      "4,34\n" +
      "5,65\n" +
      "6,111\n";
  compareResultAsTuples(result, expected);
}

代码示例来源:origin: apache/flink

.groupBy(0).sum(1)
.groupBy(0, 1).sum(2)
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());

代码示例来源:origin: apache/flink

.sum(1);

代码示例来源:origin: apache/flink

@Test
public void testFirstNOnGroupedDS() throws Exception {
  /*
   * First-n on grouped data set
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
      .map(new OneMapper2()).groupBy(0).sum(1);
  List<Tuple2<Long, Integer>> result = first.collect();
  String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

.sum(1);

代码示例来源:origin: apache/flink

@Test
public void testHashPartitionByKeyField2() throws Exception {
  /*
   * Test hash partition by key field
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
    .map(new PrefixMapper())
    .partitionByHash(1, 2)
    .groupBy(1, 2)
    .sum(0);
  List<Tuple3<Integer, Long, String>> result = sum.collect();
  String expected = "(1,1,Hi)\n" +
    "(5,2,Hello)\n" +
    "(4,3,Hello)\n" +
    "(5,3,I am )\n" +
    "(6,3,Luke )\n" +
    "(34,4,Comme)\n" +
    "(65,5,Comme)\n" +
    "(111,6,Comme)";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testRangePartitionByKeyField2() throws Exception {
  /*
   * Test range partition by key field
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
    .map(new PrefixMapper())
    .partitionByRange(1, 2)
    .groupBy(1, 2)
    .sum(0);
  List<Tuple3<Integer, Long, String>> result = sum.collect();
  String expected = "(1,1,Hi)\n" +
  "(5,2,Hello)\n" +
  "(4,3,Hello)\n" +
  "(5,3,I am )\n" +
  "(6,3,Luke )\n" +
  "(34,4,Comme)\n" +
  "(65,5,Comme)\n" +
  "(111,6,Comme)";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

.groupBy(0, 1).sum(2)
.groupBy(0).sum(1)
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());

代码示例来源:origin: apache/flink

DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
res1.writeAsText(resultPath[1]);
res.writeAsText(resultPath[0]);

代码示例来源:origin: apache/flink

@Test
public void testCustomPartitioningTupleAgg() {
  try {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
        .rebalance().setParallelism(4);
    
    data.groupBy(0).withPartitioner(new TestPartitionerInt())
      .sum(1)
      .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
    
    Plan p = env.createProgramPlan();
    OptimizedPlan op = compileNoStats(p);
    
    SinkPlanNode sink = op.getDataSinks().iterator().next();
    SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
    SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
    
    assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
    assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
  }
  catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
  }
}

代码示例来源:origin: apache/flink

.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

代码示例来源:origin: apache/flink

.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

代码示例来源:origin: apache/flink

DataSet<Tuple2<Long, Long>> union234 = src4.union(union23);
union123.groupBy(0).sum(1).name("1").output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
union234.groupBy(1).sum(0).name("2").output(new DiscardingOutputFormat<Tuple2<Long, Long>>());

代码示例来源:origin: apache/flink

.groupBy(0).sum(1).name("Count Words")

代码示例来源:origin: com.alibaba.blink/flink-gelly

/**
 * Return the degree of all vertices in the graph.
 *
 * @return A DataSet of {@code Tuple2<vertexId, degree>}
 */
public DataSet<Tuple2<K, LongValue>> getDegrees() {
  return outDegrees()
    .union(inDegrees()).name("In- and out-degree")
    .groupBy(0).sum(1).name("Sum");
}

相关文章