RabbitMQ:从队列中为多个消费者分发固定消息

ddarikpa  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(3)|浏览(178)

有没有一种方法可以限制RabbitMQ队列只从队列向消费者分发固定数量的消息?
我有2个消费者Q1和Q2,以及10个消费者。每个消费者可以处理来自Q1和Q2的消息。在任何给定的时间,只有2个消费者应该处理来自Q2的消息。所有10个消费者可以同时处理来自Q1的消息。
RabbitMQ中是否有任何我们可以指定的配置,以便RabbitMQ仅从Q2向任何空闲消费者推送2条消息,并且仅在确认后推送下2条消息,即使其他消费者是空闲的并且准备消费。
关于这个问题的更多背景:
为什么一次只处理两条消息?:Q2消息正在进行Web服务调用,并且Web服务端点(第三方)只能同时服务2条消息。
我们可以使用并发吗?:如果我们使用ListenerContainer(Spring AMQP),则容器是每个消费者。我们可以限制一个消费者一次可以接收多少消息,但是当我们有10个消费者时,如果队列中有消息,每个消费者将获得其份额。
我们可以只配置2个消费者监听Q2吗?:我知道我们可以通过在第二季度只配置2个消费者来实现这一点,但我试图避免这种情况。如果由于某种原因这2个消费者停机,Q2的处理将停止。如果配置了10个消费者,我们可以保证处理会一直进行到最后一个消费者关闭。
看看RabbitMQ中是否有我们可以使用的配置或任何建议的解决方案。
提前感谢!

mrphzbgm

mrphzbgm1#

我很确定consumer prefetch会实现你想要的。但是,Q2只能有一个消费者才能实现这一点。没有办法在多个消费者之间进行协调--你必须自己做,并且可以使用RabbitMQ来进行协调。

mspsb9vt

mspsb9vt2#

我觉得你太沉迷于问题的定义了。你真正需要的是微不足道的,所以让我们把它分解一下。
给定两个队列,Q1Q2

  • 10名消费者
  • 每个消费者都可以处理来自Q1和Q2的消息。
  • 在任何给定时间,只有2个消费者应该处理来自Q2的消息。
  • 所有10个消费者可以同时处理来自Q1的消息。
    对问题陈述的评论

首先,假设队列是独立的。一个独立的进程P将有队列Q,因此Q1服务于进程P1。这是一个严格的数学要求-您不能为单个进程P定义两个队列。
因此,第二个约束在数学上是不正确的,原因与您无法编写一个有效的函数来互换地接受stringbool类型的参数相同。它必须接受一个或另一个,因为它们不是兼容的类型,或者它必须接受类型的单个公共祖先而不考虑子类型。这是Liskov Substitution Principle的变体。

  • 重新定义问题 *

系统中共有12个消费者:

  • Q1有10个消费者
  • Q2有2个消费者
  • [重要]队列之间不共享消费者

RabbitMQ中是否有任何我们可以指定的配置,以便RabbitMQ仅从Q2向任何空闲消费者推送2条消息,并且仅在确认后推送下2条消息,即使其他消费者是空闲的并且准备消费。
根据问题的新定义,您有两个选择:
1.使用Basic.Get-在消费者处理完最后一条消息后立即从队列中拉取下一条消息。
1.使用consumer prefetch,限制为1。这将立即为每个消费者传递第一条和第二条消息,然后在确认该消费者的下一条消息时一次传递一条附加消息。这有点复杂,但如果您的延迟余量小于10毫秒,则可能有意义。

注意,通过正确定义问题空间,我们已经消除了试图弄清楚如何确保在任何时候只有两个消费者处理Q2消息的基本问题。

oyxsuwqo

oyxsuwqo3#

尝试3.8+版本的新功能Single Active Consumer。
单个活动消费者允许在一个时间内从队列中消耗一个消费者,并在活动消费者被取消或死亡的情况下故障转移到另一个注册消费者。当消息必须按照它们到达队列的相同顺序进行消费和处理时,只使用一个消费者是很有用的。在声明队列时,可以启用单个活动消费者,并将x-single-active-consumer参数设置为true
https://www.rabbitmq.com/consumers.html#single-active-consumer
e.g. with the Java client:

相关问题