在下面的代码中:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromElements(
Tuple2.of("01", 1),
Tuple2.of("02", 2),
Tuple2.of("03", 3),
Tuple2.of("04", 4),
Tuple2.of("05", 5)
);
dataStream.print();
输出显示:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1227405543]
08/26/2023 19:00:45 Job execution switched to status RUNNING.
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to SCHEDULED
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to RUNNING
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to RUNNING
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to FINISHED
08/26/2023 19:00:45 Job execution switched to status FINISHED.
1.为什么输出显示的是无序的流?如何在排序中启用顺序?
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
1.根据跟踪,为什么启用了多个接收器(几乎8个)?
1条答案
按热度按时间mftmpeh81#
Flink中有一个并行的概念,你可以把它想象成多个线程同时工作。当代码中没有明确的并行度时,CPU数量将用作默认并行度。这也解释了为什么有8个Flume。
所以在你的代码中,这五个元组是在不同的线程中执行的,所以多个线程同时工作,最终的输出是不有序的。
当这些元组在同一个线程中执行时,它们的输出结果是有序的,也就是你定义的元素的顺序。你可以通过这样设置
env.setParallelism(1);
来将全局并行度设置为1,但是这样会降低处理的效率。如果要实现多个并行度下的排序,只能在sink期间更改并行度,并在同一线程中处理它们。