本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping.sum()
方法的一些代码示例,展示了UnsortedGrouping.sum()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping.sum()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping
方法名:sum
[英]Syntactic sugar for aggregate (SUM, field).
[中]聚合的语法糖(总和,字段)。
代码示例来源:origin: apache/flink
/**
* Return the degree of all vertices in the graph.
*
* @return A DataSet of {@code Tuple2<vertexId, degree>}
*/
public DataSet<Tuple2<K, LongValue>> getDegrees() {
return outDegrees()
.union(inDegrees()).name("In- and out-degree")
.groupBy(0).sum(1).name("Sum");
}
代码示例来源:origin: apache/flink
private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.readTextFile(inFile.getAbsolutePath())
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1)
.writeAsCsv(outFile.getAbsolutePath());
return env.createProgramPlan();
}
}
代码示例来源:origin: apache/flink
.sum(1);
代码示例来源:origin: apache/flink
/**
* Fix for FLINK-2019.
*
* @throws Exception
*/
@Test
public void testJodatimeDateTimeWithKryo() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new Tuple2<>(1, DateTime.now()));
DataSet<Tuple2<Integer, DateTime>> reduceDs = ds.groupBy("f1").sum(0).project(0);
List<Tuple2<Integer, DateTime>> result = reduceDs.collect();
String expected = "1\n";
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testClusterWithIPv6host() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.getConfig().disableSysoutLogging();
// get input data
DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String token : value.toLowerCase().split("\\W+")) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
})
.groupBy(0).sum(1);
List<Tuple2<String, Integer>> result = counts.collect();
TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testGroupedAggregate() throws Exception {
/*
* Grouped Aggregate
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
.sum(0)
.project(1, 0);
List<Tuple2<Long, Integer>> result = aggregateDs.collect();
String expected = "1,1\n" +
"2,5\n" +
"3,15\n" +
"4,34\n" +
"5,65\n" +
"6,111\n";
compareResultAsTuples(result, expected);
}
代码示例来源:origin: apache/flink
.groupBy(0).sum(1)
.groupBy(0, 1).sum(2)
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
代码示例来源:origin: apache/flink
.sum(1);
代码示例来源:origin: apache/flink
@Test
public void testFirstNOnGroupedDS() throws Exception {
/*
* First-n on grouped data set
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
.map(new OneMapper2()).groupBy(0).sum(1);
List<Tuple2<Long, Integer>> result = first.collect();
String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
.sum(1);
代码示例来源:origin: apache/flink
@Test
public void testHashPartitionByKeyField2() throws Exception {
/*
* Test hash partition by key field
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
.map(new PrefixMapper())
.partitionByHash(1, 2)
.groupBy(1, 2)
.sum(0);
List<Tuple3<Integer, Long, String>> result = sum.collect();
String expected = "(1,1,Hi)\n" +
"(5,2,Hello)\n" +
"(4,3,Hello)\n" +
"(5,3,I am )\n" +
"(6,3,Luke )\n" +
"(34,4,Comme)\n" +
"(65,5,Comme)\n" +
"(111,6,Comme)";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testRangePartitionByKeyField2() throws Exception {
/*
* Test range partition by key field
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
.map(new PrefixMapper())
.partitionByRange(1, 2)
.groupBy(1, 2)
.sum(0);
List<Tuple3<Integer, Long, String>> result = sum.collect();
String expected = "(1,1,Hi)\n" +
"(5,2,Hello)\n" +
"(4,3,Hello)\n" +
"(5,3,I am )\n" +
"(6,3,Luke )\n" +
"(34,4,Comme)\n" +
"(65,5,Comme)\n" +
"(111,6,Comme)";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
.groupBy(0, 1).sum(2)
.groupBy(0).sum(1)
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
代码示例来源:origin: apache/flink
DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
res1.writeAsText(resultPath[1]);
res.writeAsText(resultPath[0]);
代码示例来源:origin: apache/flink
@Test
public void testCustomPartitioningTupleAgg() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
代码示例来源:origin: apache/flink
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
代码示例来源:origin: apache/flink
.sum(1)
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
代码示例来源:origin: apache/flink
DataSet<Tuple2<Long, Long>> union234 = src4.union(union23);
union123.groupBy(0).sum(1).name("1").output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
union234.groupBy(1).sum(0).name("2").output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
代码示例来源:origin: apache/flink
.groupBy(0).sum(1).name("Count Words")
代码示例来源:origin: com.alibaba.blink/flink-gelly
/**
* Return the degree of all vertices in the graph.
*
* @return A DataSet of {@code Tuple2<vertexId, degree>}
*/
public DataSet<Tuple2<K, LongValue>> getDegrees() {
return outDegrees()
.union(inDegrees()).name("In- and out-degree")
.groupBy(0).sum(1).name("Sum");
}
内容来源于网络,如有侵权,请联系作者删除!