Flink 为什么有界流会打印出无序序列?

nbysray5  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(117)

在下面的代码中:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromElements(
        Tuple2.of("01", 1),
        Tuple2.of("02", 2),
        Tuple2.of("03", 3),
        Tuple2.of("04", 4),
        Tuple2.of("05", 5)
        );  
    dataStream.print();

输出显示:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1227405543]
08/26/2023 19:00:45     Job execution switched to status RUNNING.
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to SCHEDULED
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to SCHEDULED 
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to DEPLOYING 
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to RUNNING 
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to RUNNING
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to RUNNING
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to FINISHED 
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to FINISHED
08/26/2023 19:00:45     Job execution switched to status FINISHED.

1.为什么输出显示的是无序的流?如何在排序中启用顺序?

5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)

1.根据跟踪,为什么启用了多个接收器(几乎8个)?

mftmpeh8

mftmpeh81#

Flink中有一个并行的概念,你可以把它想象成多个线程同时工作。当代码中没有明确的并行度时,CPU数量将用作默认并行度。这也解释了为什么有8个Flume。
所以在你的代码中,这五个元组是在不同的线程中执行的,所以多个线程同时工作,最终的输出是不有序的。
当这些元组在同一个线程中执行时,它们的输出结果是有序的,也就是你定义的元素的顺序。你可以通过这样设置env.setParallelism(1);来将全局并行度设置为1,但是这样会降低处理的效率。
如果要实现多个并行度下的排序,只能在sink期间更改并行度,并在同一线程中处理它们。

相关问题