java—如何对flink processfunction进行单元测试?

s6fujrry  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(439)

我有一个简单的processfunction,它接受字符串作为输入,并给出一个字符串作为输出。如何使用junit进行单元测试?因为processelement方法是void方法,不返回任何值。

public class SampleProcessFunction extends ProcessFunction<String, String>{
    @Override
    public void processElement(String content, Context context, Collector<String> collector) throws Exception {
        String output = content + "output";
        collector.collect(output);
    }
}
k75qkfdt

k75qkfdt1#

如果要在对象上使用,可以使用:

Mockito.atLeastOnce();
    Mockito.doReturn(myCustomObject);
lh80um4z

lh80um4z2#

为了单元测试这个方法,定义预期的行为。在本例中,预期的行为是对 Collector::collect 方法 content + "output" 作为论据。因此,可以使用模拟收集器对其进行测试。
以下是使用mockito框架的示例:

...

private final Collector<String> collectorMock = Mockito.mock(Collector.class);
private final Context contextMock = Mockito.mock(Context.class);

private final SampleProcessFunction sampleProcessFunction = new SampleProcessFunction();

@Test
public void testProcessElement_shouldInvokeCollector_whenAnyValuePassed() throws Exception {
    // given
    final String content = "hello ";

    // when
    sampleProcessFunction.processElement(content, contextMock, collectorMock);

    // then
    Mockito.verify(collectorMock).collect(content + "output"); // verifies that collector method was invoked with "hello output" exactly once
}

...

相关问题