我有一个Java Flink(版本1.15)应用程序,它在AWS Kinesis Flink Runtime中运行异步I/O操作,并行度设置为12。该操作从FlinkKinesisConsumer读取消息流,并在异步操作中处理它。
FlinkKinesis消费者:
DataStream<> inputMessages =
env.addSource(new FlinkKinesisConsumer<>(kinesisStream, new Deserializer(),
streamSourceProperties));
异步操作:
DataStream<> processedStream =
AsyncDataStream.unorderedWait(inputMessages, new AsyncIOFunction(environment), 1000,
TimeUnit.MILLISECONDS, 100);
在Flink Jmeter 板中,我看到FlinkKinesisConsumer和Async操作有12个子任务。然而,数据仅在子任务中的一个中处理。
下面的屏幕截图显示了在一个子任务中处理的记录,而其他子任务没有加载。
Kinesis消费者:
异步I/O操作:
为什么这种分配不平等?如果负载很高,这会对性能产生影响吗?另外,如何使处理在子任务之间均匀分布?
我看到了这个问题Equally distribute operators with single parallelism in a multi-parallel Flink application,但我仍然不确定这个问题的答案。
1条答案
按热度按时间wrrgggsh1#
看起来您的Kinesis配置只有一个分片,因此FlinkKinesisSource仅用一个子任务读取它,即使并行度大于该值。
请阅读文档中的这一部分:
这里最好的方法是有多个分片(最好的设置是你的parralelism计数)。如果你不能做到这一点,你可以在Flink从Kinesis获取记录后,使用keyBy或随机分区将记录随机分配到其他子任务,但这种方法效率较低,并且对缓冲区的网络负载更大。