ApacheFlink:如何在单元测试中更新源函数?

r6hnlfcb  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(294)

我需要我的flink作业从源函数的本地示例读取数据,并在单元测试代码本身(而不是流)中每次源函数示例的数据更改时进行更新。
伪代码:

StreamExecutionEnvironment env = ...getExecutionEnvironment();
StockSource src = new StockSource(); // the Source Function instance
env.addSource(src);
results = Pipeline(env); // does some calculations and returns the calculated data
env.execute();

// Test 1
When: src.sendData("TWTR", 120.6);
Assert: results.eurRate == 98.87;

// Test 2
When: src.sendData("GOOG", 300);
Assert: results.eurRate == 245.95;

在Flink这样做可能吗?

bt1cpqcv

bt1cpqcv1#

您可以做的是编写作业,使源和汇是可插入的,然后实现合适的源和汇进行测试。换言之,是这样的:

public class TestableStreamingJob {
  private SourceFunction<Long> source;
  private SinkFunction<Long> sink;

  public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
    this.source = source;
    this.sink = sink;
  }

  public void execute() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Long> LongStream = env.addSource(source).returns(TypeInformation.of(Long.class));

    LongStream
      .map(new IncrementMapFunction())
      .addSink(sink);

    env.execute();
  }

  public static void main(String[] args) throws Exception {
    TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
    job.execute();
  }
}

然后可以这样测试:

public class TestableStreamingJobTest {
  @ClassRule
  public static MiniClusterWithClientResource flinkCluster =
      new MiniClusterWithClientResource(
          new MiniClusterResourceConfiguration.Builder()
              .setNumberSlotsPerTaskManager(2)
              .setNumberTaskManagers(1)
              .build());

  @Test
  public void testCompletePipeline() throws Exception {
    ParallelSourceFunction<Long> source = new ParallelCollectionSource(Arrays.asList(1L, 10L, -10L));
    SinkCollectingLongs sink = new SinkCollectingLongs();
    TestableStreamingJob job = new TestableStreamingJob(source, sink);

    job.execute();

    assertThat(sink.result).containsExactlyInAnyOrder(2L, 11L, -9L);
  }
}

用于测试的Flume如下所示:

public class SinkCollectingLongs implements SinkFunction<Long> {

  public static final List<Long> result =
      Collections.synchronizedList(new ArrayList<>());

  public void invoke(Long value, Context context) throws Exception {
    result.add(value);
  }
}

这个例子是从https://github.com/knaufk/flink-testing-pyramid,有关详细信息,请参阅。

相关问题