org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup()方法的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(11.5k)|赞(0)|评价(0)|浏览(157)

本文整理了Java中org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup()方法的一些代码示例,展示了UnsortedGrouping.sortGroup()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。UnsortedGrouping.sortGroup()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.UnsortedGrouping
类名称:UnsortedGrouping
方法名:sortGroup

UnsortedGrouping.sortGroup介绍

[英]Sorts org.apache.flink.api.java.tuple.Tuple elements within a group on the specified field in the specified Order.

Note: Only groups of Tuple elements and Pojos can be sorted.

Groups can be sorted by multiple fields by chaining #sortGroup(int,Order) calls.
[中]分类组织。阿帕奇。弗林克。应用程序编程接口。JAVA元组。组中的元组元素按指定顺序排列在指定字段上。
注意:只能对元组元素组和POJO进行排序。
通过链接#sortGroup(int,Order)调用,可以按多个字段对组进行排序。

代码示例

代码示例来源:origin: apache/flink

private void createSortOperation(PythonOperationInfo info) {
  if (sets.isDataSet(info.parentID)) {
    throw new IllegalArgumentException("sort() can not be applied on a DataSet.");
  } else if (sets.isUnsortedGrouping(info.parentID)) {
    sets.add(info.setID, sets.getUnsortedGrouping(info.parentID).sortGroup(info.field, info.order));
  } else if (sets.isSortedGrouping(info.parentID)) {
    sets.add(info.setID, sets.getSortedGrouping(info.parentID).sortGroup(info.field, info.order));
  }
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields4() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should not work
  tupleDs.groupBy(0)
      .sortGroup(2, Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields5() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should not work
  tupleDs.groupBy(0)
      .sortGroup(3, Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = IndexOutOfBoundsException.class)
public void testGroupSortKeyFields2() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
  // should not work, field index out of bounds
  tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortByKeyExpression6() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should not work
  tupleDs.groupBy("f0")
      .sortGroup("f3", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortByKeyExpression4() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should not work
  tupleDs.groupBy("f0")
      .sortGroup("f2", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields3() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
  // should not work: sorted groups on groupings by key selectors
  longDs.groupBy(new KeySelector<Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long getKey(Long value) {
      return value;
    }
  }).sortGroup(0, Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test
public void testGroupSortByKeyExpression1() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should work
  try {
    tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test(expected = InvalidProgramException.class)
public void testGroupSortByKeyExpression5() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should not work
  tupleDs.groupBy("f0")
      .sortGroup("f1", Order.ASCENDING)
      .sortGroup("f2", Order.ASCENDING);
}

代码示例来源:origin: apache/flink

@Test
public void testGroupSortByKeyExpression2() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should work
  try {
    tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testGroupSortKeyFields1() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testChainedGroupSortKeyFields() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
  // should work
  try {
    tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Override
public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
  DataSet<Edge<K, EV>> edges = input.getEdges();
  // annotate edges with degrees
  DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<>())
      .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<>())
      .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<>());
  // project edges by degrees
  DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<>());
  // project edges by vertex id
  DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<>());
  DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
      // build triads
      .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
      .reduceGroup(new TriadBuilder<>())
      // filter triads
      .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<>());
  return triangles;
}

代码示例来源:origin: apache/flink

@Test
public void testIdentityWithGroupByAndSort() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
  DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
      .groupBy(1)
      .sortGroup(1, Order.DESCENDING)
      // reduce partially
      .combineGroup(new IdentityFunction())
      .groupBy(1)
      .sortGroup(1, Order.DESCENDING)
      // fully reduce
      .reduceGroup(new IdentityFunction());
  List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
  compareResultAsTuples(result, identityResult);
}

代码示例来源:origin: apache/flink

@Test
public void testGroupSortByKeyExpression3() {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs =
      env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
  // should work
  try {
    tupleDs.groupBy("f0")
        .sortGroup("f2.myString", Order.ASCENDING)
        .sortGroup("f1", Order.DESCENDING);
  } catch (Exception e) {
    Assert.fail();
  }
}

代码示例来源:origin: apache/flink

@Test
public void testIntBasedDefinitionOnGroupSortForFullNestedTuple() throws Exception {
  /*
   * Test int-based definition on group sort, for (full) nested Tuple
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
  DataSet<String> reduceDs = ds.groupBy("f1").sortGroup(0, Order.DESCENDING).reduceGroup(new NestedTupleReducer());
  List<String> result = reduceDs.collect();
  String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
      "b--(2,2)-\n" +
      "c--(4,9)-(3,6)-(3,3)-\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testStringBasedDefinitionOnGroupSortForPartialNestedTuple() throws Exception {
  /*
   * Test string-based definition on group sort, for (partial) nested Tuple DESC
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
  // f0.f0 is first integer
  DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
  List<String> result = reduceDs.collect();
  String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
      "b--(2,2)-\n" +
      "c--(4,9)-(3,3)-(3,6)-\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos() throws Exception {
  /*
   * Test string-based definition on group sort, for two grouping keys with Pojos
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
  // f0.f0 is first integer
  DataSet<String> reduceDs = ds.groupBy("hadoopFan").sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING)
      .reduceGroup(new GroupReducer5());
  List<String> result = reduceDs.collect();
  String expected = "1---(10,100)-\n"
      +
      "2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeys() throws Exception {
  /*
   * Test string-based definition on group sort, for two grouping keys
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);
  DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
  // f0.f0 is first integer
  DataSet<String> reduceDs = ds.groupBy("f1").sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup(new NestedTupleReducer());
  List<String> result = reduceDs.collect();
  String expected = "a--(2,1)-(1,3)-(1,2)-\n" +
      "b--(2,2)-\n" +
      "c--(4,9)-(3,6)-(3,3)-\n";
  compareResultAsText(result, expected);
}

代码示例来源:origin: apache/flink

@Test
public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
  DataSet<Integer> resultDs = ds
    .groupBy(0)
    .sortGroup(0, Order.ASCENDING)
    .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
    .returns(BasicTypeInfo.INT_TYPE_INFO);
  List<Integer> result = resultDs.collect();
  String expectedResult = "2\n" +
    "3\n" +
    "1\n";
  compareResultAsText(result, expectedResult);
}

相关文章