如何创建一个假数据流,每隔一段时间进入apachebeam管道?

nxagd54h  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(368)

我正在尝试创建小的apachebeam流式程序来测试想法,我认为获取数据最简单的方法就是使用框架结构,比如 Create.of 制造假数据。那样的话,我就不需要设置更多的内容,比如设置gcp发布/子主题作为源并发布到它。
问题是我想尝试一些基于时间的东西,比如窗口和使用状态和计时器。我能把这些放在一起:

public class TestPipeline {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        p.apply(Create.of(1, 2, 3))
            .apply(ParDo.of(new DoFn<Integer, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(c.element().toString());
                }
            }))
            .apply(TextIO.write().to("myfile.txt"));

        p.run().waitUntilFinish();
    }
}

这实现了我在管道开始时发送三条数据的目标,但它同时发送了所有数据。我更希望能设置为每10秒发送一次数据,等等。
我遵循了apache flink的教程(https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/walkthroughs/datastream_api.html)这是我想要完成的一个例子。我深入研究了那个教程中的代码,但我无法确切地找出flink框架的哪一部分实现了这一点。

i34xakig

i34xakig1#

最后我用的是 TestStream 班级。我找到了 UnBoundedSource 类太难扩展到我的用例。博客帖子https://beam.apache.org/blog/2016/10/20/test-stream.html 帮助我了解如何在考试中使用这个类。

btqmn9zl

btqmn9zl2#

退房 SyntheticBoundedSource 以及 SyntheticUnboundedSource 上课!
它们可以让你参数化很多关于数据生成的信息,从键/值大小,记录释放之间的延迟等等 SyntheticSourceOptions 选项界面,因此也可以是检查可能的自定义设置的好地方。

相关问题