本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup()
方法的一些代码示例,展示了UnsortedGrouping.sortGroup()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping.sortGroup()
方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping
方法名:sortGroup
[英]Sorts org.apache.flink.api.java.tuple.Tuple elements within a group on the specified field in the specified Order.
Note: Only groups of Tuple elements and Pojos can be sorted.
Groups can be sorted by multiple fields by chaining #sortGroup(int,Order) calls.
[中]分类组织。阿帕奇。弗林克。应用程序编程接口。JAVA元组。组中的元组元素按指定顺序排列在指定字段上。
注意:只能对元组元素组和POJO进行排序。
通过链接#sortGroup(int,Order)调用,可以按多个字段对组进行排序。
代码示例来源:origin: apache/flink
private void createSortOperation(PythonOperationInfo info) {
if (sets.isDataSet(info.parentID)) {
throw new IllegalArgumentException("sort() can not be applied on a DataSet.");
} else if (sets.isUnsortedGrouping(info.parentID)) {
sets.add(info.setID, sets.getUnsortedGrouping(info.parentID).sortGroup(info.field, info.order));
} else if (sets.isSortedGrouping(info.parentID)) {
sets.add(info.setID, sets.getSortedGrouping(info.parentID).sortGroup(info.field, info.order));
}
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields4() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should not work
tupleDs.groupBy(0)
.sortGroup(2, Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields5() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should not work
tupleDs.groupBy(0)
.sortGroup(3, Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = IndexOutOfBoundsException.class)
public void testGroupSortKeyFields2() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should not work, field index out of bounds
tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortByKeyExpression6() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should not work
tupleDs.groupBy("f0")
.sortGroup("f3", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortByKeyExpression4() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should not work
tupleDs.groupBy("f0")
.sortGroup("f2", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields3() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
// should not work: sorted groups on groupings by key selectors
longDs.groupBy(new KeySelector<Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long getKey(Long value) {
return value;
}
}).sortGroup(0, Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test
public void testGroupSortByKeyExpression1() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should work
try {
tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test(expected = InvalidProgramException.class)
public void testGroupSortByKeyExpression5() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should not work
tupleDs.groupBy("f0")
.sortGroup("f1", Order.ASCENDING)
.sortGroup("f2", Order.ASCENDING);
}
代码示例来源:origin: apache/flink
@Test
public void testGroupSortByKeyExpression2() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should work
try {
tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testGroupSortKeyFields1() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testChainedGroupSortKeyFields() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
DataSet<Edge<K, EV>> edges = input.getEdges();
// annotate edges with degrees
DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
// project edges by degrees
DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
// project edges by vertex id
DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
// build triads
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
.reduceGroup(new TriadBuilder<>())
// filter triads
.join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
return triangles;
}
代码示例来源:origin: apache/flink
@Test
public void testIdentityWithGroupByAndSort() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
.groupBy(1)
.sortGroup(1, Order.DESCENDING)
// reduce partially
.combineGroup(new IdentityFunction())
.groupBy(1)
.sortGroup(1, Order.DESCENDING)
// fully reduce
.reduceGroup(new IdentityFunction());
List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
compareResultAsTuples(result, identityResult);
}
代码示例来源:origin: apache/flink
@Test
public void testGroupSortByKeyExpression3() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
// should work
try {
tupleDs.groupBy("f0")
.sortGroup("f2.myString", Order.ASCENDING)
.sortGroup("f1", Order.DESCENDING);
} catch (Exception e) {
Assert.fail();
}
}
代码示例来源:origin: apache/flink
@Test
public void testIntBasedDefinitionOnGroupSortForFullNestedTuple() throws Exception {
/*
* Test int-based definition on group sort, for (full) nested Tuple
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
"b--(2,2)-\n" +
"c--(4,9)-(3,6)-(3,3)-\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testStringBasedDefinitionOnGroupSortForPartialNestedTuple() throws Exception {
/*
* Test string-based definition on group sort, for (partial) nested Tuple DESC
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
"b--(2,2)-\n" +
"c--(4,9)-(3,3)-(3,6)-\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos() throws Exception {
/*
* Test string-based definition on group sort, for two grouping keys with Pojos
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
.reduceGroup(new GroupReducer5());
List<String> result = reduceDs.collect();
String expected = "1---(10,100)-\n"
+
"2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeys() throws Exception {
/*
* Test string-based definition on group sort, for two grouping keys
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
List<String> result = reduceDs.collect();
String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
"b--(2,2)-\n" +
"c--(4,9)-(3,6)-(3,3)-\n";
compareResultAsText(result, expected);
}
代码示例来源:origin: apache/flink
@Test
public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Integer> resultDs = ds
.groupBy(0)
.sortGroup(0, Order.ASCENDING)
.reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
.returns(BasicTypeInfo.INT_TYPE_INFO);
List<Integer> result = resultDs.collect();
String expectedResult = "2\n" +
"3\n" +
"1\n";
compareResultAsText(result, expectedResult);
}
内容来源于网络,如有侵权,请联系作者删除!