rabbitmq在具有优先级队列的spring引导中的应用

ftf50wuq  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(392)

我的问题是,我试图用rabbitmq实现优先级队列,但它总是随机的。即使我设置priority@rabbitlistener(queues=queue\u messages,priority=“10”)。
我给两个问题发了100条信息:

public void sendRequest() {
        for (int i = 0; i < 100; i++) {
            try {
                rabbitTemplate.convertAndSend(ProducerConfig.QUEUE_MESSAGES2,
                        new MessageDTO("Subject Two", "content2"), message -> {
                            message.getMessageProperties().setPriority(Integer.valueOf(10));
                            return message;
                        });

                rabbitTemplate.convertAndSend(ProducerConfig.QUEUE_MESSAGES,
                        new MessageDTO("Subject One", "content1"), message -> {
                            message.getMessageProperties().setPriority(Integer.valueOf(1));
                            return message;
                        });
                System.out.println("messages has been send");
            } catch (AmqpException ex) {
                System.out.println(ex.getMessage());
            }
        }
    }

所以我有两个听众:

@RabbitListener(queues = QUEUE_MESSAGES, priority = "1")
    public void receiveMessage(MessageDTO message) throws BusinessException, InterruptedException {
        try {
            System.out.println(message.getSubject());
        } catch (Exception ex) {
            System.out.println("exception" + ex.getMessage());
        }
    }

    @RabbitListener(queues = QUEUE_MESSAGES2, priority = "10")
    public void receiveMessage2(MessageDTO message) throws BusinessException, InterruptedException {
        try {
            System.out.println(message.getSubject());
        } catch (Exception ex) {
            System.out.println("exception" + ex.getMessage());
        }
    }

我的输出是这样随机的:

Subject One
Subject Two
Subject One
Subject Two
Subject One
Subject Two
Subject One
Subject Two
Subject One
Subject Two
Subject One
Subject Two
Subject One
Subject Two
Subject One

我需要从第一个队列接收所有消息,然后从第二个队列接收消息。有人能帮忙吗?
我已经在application.properties中尝试过了:

spring.rabbitmq.listener.simple.prefetch=1

我的版本是:rabbitmq 3.8.12 erlang 23.2.6

edit我在producer config中将priority设置为queue,在sending request priority设置为messages,但这对producer config没有帮助:

@Bean
    public Declarables fanoutBindings() {
        Queue messageQueue = QueueBuilder.durable(QUEUE_MESSAGES)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
                .withArgument("x-priority", Integer.valueOf(1))
                .build();
        Queue messageQueue2 = QueueBuilder.durable(QUEUE_MESSAGES2)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
                .withArgument("x-priority", Integer.valueOf(10))
                .build();
        Queue deadLetterQueue = QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
        Queue parkingLotQueue = QueueBuilder.durable(QUEUE_PARKING_LOT).build();
        FanoutExchange deadLetterExchange = new FanoutExchange(DLX_EXCHANGE_MESSAGES);

        return new Declarables(
                messageQueue,
                parkingLotQueue,
                deadLetterQueue,
                messageQueue2,
                deadLetterExchange,
                BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange));
    }
6ovsh4lw

6ovsh4lw1#

这个 priority 上的属性 @RabbitListener 消费者优先。具有较高优先级的使用者将在其处于活动状态时接收消息,而具有较低优先级的使用者仅在具有较高优先级的使用者阻止时接收消息。这假设这些消费者是从同一个队列消费的,这不是您的情况。
如果要实现优先级消息,则需要定义具有最大优先级的优先级队列,并在发送消息时设置priority属性(没有优先级的消息将被视为0优先级)。

相关问题