我正在使用队列中的跟踪日志(apachepulsar)。我使用5 keyedprcoessfunction,最后将有效负载下沉到postgres db。我需要为每个keyedprocessfunction按customerid订购。现在我通过
Datasource.keyBy(fooKeyFunction).process(processA).keyBy(fooKeyFunction).process(processB).keyBy(fooKeyFunction).process(processC).keyBy(fooKeyFunction).process(processE).keyBy(fooKeyFunction).sink(fooSink).
processfunctionc非常耗时,最坏情况下需要30秒才能完成。这会导致背压。我试着给processfunctionc分配更多的插槽,但是我的吞吐量从来没有保持不变。它通常保持<4条消息/秒。
每个processfunction的当前插槽为
processFunctionA: 3
processFunctionB: 30
processFunctionc: 80
processFunctionD: 10
processFunctionC: 10
在flinkui中,它显示从进程b开始的背压,这意味着c非常慢。有没有一种方法可以使用在源本身应用分区逻辑,并将每个任务的相同插槽分配给每个processfunction。例如:
dataSoruce.magicKeyBy(fooKeyFunction).setParallelism(80).process(processA).process(processB).process(processC).process(processE).sink(fooSink).
这将导致背压只发生在少数任务中,而不会扭曲由多个keyby引起的背压。
我能想到的另一种方法是将我的所有processfunction和sink组合成单个processfunction,并在sink本身中应用所有这些逻辑。
1条答案
按热度按时间qeeaahzv1#
我认为不存在这样的事情。最接近的是
DataStreamUtils.reinterpretAsKeyedStream
,重新创建KeyedStream
没有在操作符之间发送任何数据,因为它使用的分区器只在本地转发数据。这或多或少是您想要的,但它仍然添加了分区操作符,并在引擎盖下重新创建KeyedStream
,但它应该更简单,更快,也许它会解决你所面临的问题。如果这不能解决这个问题,那么我认为最好的解决方案是对操作符进行分组,以便按照您的建议将背压最小化,即将所有操作符合并到一个更大的操作符中,这应该将背压最小化。