单元测试flink函数

nhaq1z21  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(382)

我使用的是flink v.1.4.0。
我已经实现了一个模块,作为我正在开发的包的一部分,它的作用是对流进行重复数据消除。该模块非常简单:

  1. public class RemoveDuplicateFilter<T> extends RichFlatMapFunction<T, T> {
  2. static final ValueStateDescriptor<Boolean> SEEN_DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class);
  3. private ValueState<Boolean> seen;
  4. @Override
  5. public void open(Configuration configuration) {
  6. RuntimeContext runtimeContext = this.getRuntimeContext();
  7. seen = runtimeContext.getState(SEEN_DESCRIPTOR);
  8. }
  9. @Override
  10. public void flatMap(T value, Collector<T> out) throws Exception {
  11. Boolean hasBeenSeen = seen.value();
  12. if(hasBeenSeen == null || !hasBeenSeen) {
  13. out.collect(value);
  14. seen.update(true);
  15. }
  16. }

问题是:如何在不必示例化实际代码的情况下测试此代码 Flink 价值状态?i、 e.使用mockito?
我试过很多方法,但基本上,当涉及到打电话时:

  1. RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
  2. ...
  3. when(runtimeContext.getState(SEEN_DESCRIPTOR)).thenReturn(seen);

呼叫总是失败。我试过换新的 SEEN_DESCRIPTORMatchers.any() 但还是没有运气。
有什么建议吗?

hxzsmxv2

hxzsmxv21#

可以使用FlinkSpitter对函数进行单元测试。

相关问题