我的目标是为flink1.10中的流处理模块提供一个接口。管道在其他操作符中包含aggregatefunction。所有操作符都有泛型类型,但问题出在aggregatefunction中,它无法确定输出类型。
注意:实际的管道有一个slidingeventtimewindow赋值器和一个与aggregatefunction一起传递的windowfunction,但是使用下面的代码可以更容易地再现错误。
这是一个复制错误的简单测试用例:
@Test
public void aggregateFunction_genericType() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));
ConfigAPI cfg = new ConfigAPI();
source
.keyBy(k -> k.f0)
.countWindow(5, 1)
.aggregate(new GenericAggregateFunc<>(cfg))
.print();
env.execute();
}
如您所见,配置类作为参数传递给自定义aggregatefunction。这是用户将实现的。
public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
@Override
public Tuple2<String, Integer> createAcc() {
return new Tuple2<>("0", 0);
}
@Override
public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
acc.f1 += in.f1;
return acc;
}
}
提供的接口是:
public interface BaseConfigAPI<In, Acc> {
Acc createAcc();
Acc addAccumulators(In in, Acc acc);
// other methods to override
}
常规聚集函数:
public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {
private BaseConfigAPI<In, Acc> cfg;
GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
this.cfg = cfg;
}
@Override
public Acc createAccumulator() {
return cfg.createAcc();
}
@Override
public Acc add(In in, Acc acc) {
return cfg.addAccumulators(in, acc);
}
@Override
public Acc getResult(Acc acc) {
return acc;
}
@Override
public Acc merge(Acc acc, Acc acc1) {
return null;
}
}
输出日志:
org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem.
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
解决方案1(不起作用):起初我认为这是“无法确定返回类型”的常见情况,所以我尝试添加 .returns(Types.TUPLE(Types.STRING, Types.INT))
之后 .aggregate(...)
但没有成功。
解决方案2(工作):我创建了一个具有泛型类型的 Package 器类,名为 Accumulator<Acc>
然后作为类型传递给 AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>>
似乎在起作用。
这看起来不是很优雅,但它不是很符合接口的其余部分。这个问题还有别的解决办法吗?
编辑:感谢@dedupper提供的时间和洞察力,我想我找到了解决方案。
解决方案3(工作):我创建了一个新接口来扩展 BaseConfigAPI
以及 AggregateFunction
以以下方式:
public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}
public interface BaseConfigAPI extends Serializable {
//These will be implemented directly from AggregateFunction interface
//Acc createAcc();
//Acc addAccumulators(In in, Acc acc);
//other methods to override
}
现在用户只能实现 MergedConfigAPI<In, Acc, Out>
并将其作为参数传递给 .aggregate(...)
功能。
更新:我测试了@dedupper的第三个解决方案,但也没有成功。异常似乎是由 Acc
而不是 Out
类型。仔细看看 .aggregate
接线员,我意识到有一个超负荷的 aggregate
方法,该方法再接受2个参数。一 TypeInformation<ACC> accumulatorType
和一个 TypeInformation<R> returnType
.
这就是最简单的解决方案如何在没有任何代码重构的情况下出现的。
解决方案4(工作):
@Test
public void aggregateFunction_genericType() throws Exception {
...
.aggregate(
new GenericAggregateFunc<>(cfg),
Types.TUPLE(Types.STRING, Types.INT),
Types.TUPLE(Types.STRING, Types.INT))
...
}
注:从flink 1.10.1开始 aggregate
方法用@publicevolving注解。
1条答案
按热度按时间ct2axkht1#
“能否用泛型类型实现flink的aggregatefunction?”
对。你可以。就像你自己做的那样。您的错误是您如何使用它(如“使用站点泛型”)而不是如何实现它的结果。
·····这个问题还有其他解决办法吗?····
我提出以下三个候选解决方案,按简单程度的升序排列…
以上是最简单的,因为您不必重构原始的
GenericAgregateFunc
; 只需在菱形中填入要示例化泛型类的特定类型参数。还有一个稍微不那么简单的解决方案…
尽管这一次涉及到一个小的重构,但在我看来,它比第一次提出的解决方案更简化了整个应用程序。
flink已经为您处理了“复杂”的泛型多态性。您所要做的就是,将插件插入到flink,只需示例化它们的内置泛型
AggregateFunction<IN, ACC, OUT>
使用您想要示例化它的特定类型参数。这些类型参数属于Tuple2<String, Integer>
对你来说。所以你仍然“使用泛型”第二个解决方案,但你这样做的方式要简单得多。
另一个选项更接近您的原始实现,但有几个小的重构…
另外,要强制用户的配置实现与您的函数兼容的接口的前提条件…
在我的实验中,我证实了
Out
将参数类型设置为BaseConfigAPI
也使它兼容。我确实想到了一个更复杂的替代方案。但由于简单总是好的,我将把更复杂的解决方案留给其他人提出。