我使用的是flink v.1.4.0。
我已经实现了一个模块,作为我正在开发的包的一部分,它的作用是对流进行重复数据消除。该模块非常简单:
public class RemoveDuplicateFilter<T> extends RichFlatMapFunction<T, T> {
static final ValueStateDescriptor<Boolean> SEEN_DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class);
private ValueState<Boolean> seen;
@Override
public void open(Configuration configuration) {
RuntimeContext runtimeContext = this.getRuntimeContext();
seen = runtimeContext.getState(SEEN_DESCRIPTOR);
}
@Override
public void flatMap(T value, Collector<T> out) throws Exception {
Boolean hasBeenSeen = seen.value();
if(hasBeenSeen == null || !hasBeenSeen) {
out.collect(value);
seen.update(true);
}
}
问题是:如何在不必示例化实际代码的情况下测试此代码 Flink
价值状态?i、 e.使用mockito?
我试过很多方法,但基本上,当涉及到打电话时:
RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
...
when(runtimeContext.getState(SEEN_DESCRIPTOR)).thenReturn(seen);
呼叫总是失败。我试过换新的 SEEN_DESCRIPTOR
与 Matchers.any()
但还是没有运气。
有什么建议吗?
1条答案
按热度按时间hxzsmxv21#
可以使用FlinkSpitter对函数进行单元测试。