优先级聚合器

dvtswwa3  于 2021-07-26  发布在  Java
关注(0)|答案(1)|浏览(517)

我有以下应用程序要求:
从rabbitmq接收消息,然后根据一些更复杂的规则进行聚合,例如基于 types 属性(具有预先给定的类型时间Map)并基于消息已在队列中等待的现有时间( old 财产)
所有消息应以一定的可变消息速率释放,例如1msg/sec到100msg/sec。此速率由服务控制和设置,该服务将监视rabbitmq队列大小(一个与此组件无关的队列,位于管道的最上游),如果队列中有太多消息,则会降低速率。
正如您在图中所看到的,一个用例:已经聚合了三条消息并等待下一秒发布(因为当前速率是 1msg/sec )但就在那时, MSG 带着 id:10 ,它更新了 AGGREGATED 2 ,使其成为第一条优先信息。所以在下一个滴答声,而不是释放 AGGREGATED 3 ,我们发布 AGGREGATED 2 因为它现在有更高的优先级。

现在,问题是-我可以使用SpringIntegrationAggregator吗,因为我不知道它是否支持聚合期间消息的优先级排序?我知道 groupTimeout ,但这只是调整单个消息组,而不是更改其他组的优先级。有没有可能 MessageGroupStoreReaper 当新消息到达时,是否会按优先级调整所有其他聚合消息?
更新
我做了一些类似这样的实现——目前看来还可以——它在消息到达时聚合消息,comparator根据我的自定义逻辑对消息进行排序。
你认为这可能有一些问题(并发性等)?我可以在日志中看到,轮询器在某些场合被调用了不止一次。这正常吗?

2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL

还有,这是评论吗 doit 方法,在运行时增加最大轮询消息数的正确方法?

@Bean
    public MessageChannel aggregatingChannel(){
        return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
                Comparator.comparingInt(x -> x),
                (m) -> {
                    ExampleDTO d = (ExampleDTO) m.getPayload();
                    return d.getId();
                }
        ));
    }

    class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
        private final Log logger = LogFactory.getLog(getClass());
        private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
        private final Function<Message<?>, K> keyExtractor;
        private final NavigableMap<K, Message<?>> keyToAggregatedMessage;

        public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
                                        Comparator<? super K> comparator,
                                        Function<Message<?>, K> keyExtractor) {
            this.accumulator = accumulator;
            this.keyExtractor = keyExtractor;
            keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
        }

        @Override
        public Iterator<Message<?>> iterator() {
            return keyToAggregatedMessage.values().iterator();
        }

        @Override
        public int size() {
            return keyToAggregatedMessage.size();
        }

        @Override
        public boolean offer(Message<?> m) {
            logger.info("OFFER");
            return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
        }

        @Override
        public Message<?> poll() {
            logger.info("POLL");
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
            return m != null ? m.getValue() : null;
        }

        @Override
        public Message<?> peek() {
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
            return m!= null ? m.getValue() : null;
        }
    }

//    @Scheduled(fixedDelay = 10*1000)
//    public void doit(){
//        System.out.println("INCREASE POLL");
//        pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
//    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata pollerMetadata(){
        PollerMetadata metadata = new PollerMetadata();
        metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
        metadata.setMaxMessagesPerPoll(1);
        return metadata;
    }

    @Bean
    public IntegrationFlow aggregatingFlow(
            AmqpInboundChannelAdapter aggregatorInboundChannel,
            AmqpOutboundEndpoint aggregatorOutboundChannel,
            MessageChannel wtChannel,
            MessageChannel aggregatingChannel,
            PollerMetadata pollerMetadata
    ) {
    return IntegrationFlows.from(aggregatorInboundChannel)
        .wireTap(wtChannel)
        .channel(aggregatingChannel)
        .handle(aggregatorOutboundChannel)
        .get();
    }
2j4z5cfb

2j4z5cfb1#

好吧,如果有一个新的消息需要组来完成它到达聚合器,那么这样的组会立即被释放(如果您的 ReleaseStrategy 尽管如此)。处于超时状态的组的其他成员将继续等待计划。
有可能提出一种智能算法来依赖于一个单一的公共调度 MessageGroupStoreReaper 决定是否需要释放这个部分组或者直接丢弃它。再说一次:the ReleaseStrategy 应该给我们一个释放或不释放的线索,即使部分。当丢弃发生时,我们希望将这些消息保留在聚合器中,我们需要在延迟一段时间后将它们重新发送回聚合器。过期后,组将从存储中删除,当我们已将其发送到丢弃通道时会发生这种情况,因此最好延迟这些组,并让聚合器清理这些组,以便在延迟后,我们可以安全地将它们作为新组的一部分发送回聚合器一个新的过期期。
您可能还可以在releases normal组中迭代存储区中的所有消息,以便在下一个过期时间调整它们的头中的某个时间键。
我知道这是一件很难的事,但确实没有任何现成的解决方案,因为它不是为了影响我们刚刚处理过的其他群体而设计的。。。

相关问题