在我的flink java程序中,我使用groupby操作符,如下所示:
dataSet.groupBy(new KeySelector<myObject, Tuple2<Tuple2<Integer, Integer>, Integer>>() {
private static final long serialVersionUID = 5L;
Tuple2<Tuple2<Integer, Integer>, Integer> groupingKey = new Tuple2<Tuple2<Integer, Integer>, Integer>();
public Tuple2<Tuple2<Integer, Integer>, Integer> getKey(myObject s) {
groupingKey.setField(s.getPosition(), 0);
groupingKey.setField(s.getBand(), 1);
return groupingKey;
}
})
.reduceGroup(reduceFunction);
``` `getPosition()` 返回一个 `Tuple2<Integer, Integer>` 以及 `getBand()` 返回一个 `int` .
我想根据这两个值对数据集进行分组。如果我有6个位置和4个波段,我想得到24个不同的组,并使用 `groupReduce` -为每个小组独立运作。但目前我的结果组似乎包含了乐队和位置的各种值。我在办公室里查过这个 `groupReduce` 功能:
if (this.band == null) {
this.band = myObject.getBand();
}
if (this.band != myObject.getBand()) {
System.out.println("The band should be " + this.band + " but is: " + myObject.getBand());
此外,在我的结果文件中也有一些值表明分组有问题。在我的情况下,分组是否可能不起作用?或者这仅仅是我代码中另一个潜在错误的结果?
1条答案
按热度按时间myzjeezk1#
我想你的支票
GroupReduceFunction
工作不正常。这个GroupReduceFunction.reduce()
可以为不同的组调用多次。this.band
是的成员变量GroupReduceFunction
我假设你没有重置this.band
在比赛结束时reduce()
方法。因此,
this.band
威尔null
只在第一次呼叫reduce()
. 在第二次通话开始时this.band
将被初始化,并且不会设置为当前组的频带。因此,以下检查将失败。