我使用的是RabbitMQ(3.8.14版本)和Mastransit(8.0.1版本)。
我需要保证每个消息都得到Ack,然后另一个数据包开始消费。(实际上,在我的队列中,我生成了一个唯一的增量密钥,然后我必须确保此时只有一个数据包被消费。)
值得注意的是,我的rabbit MQ被配置为具有三个节点的集群。
为此,我将预取计数配置为“1”值。
但是在第一条消息的确认之前的负载测试中,第二条消息被消耗了。尽管给出了关于“预取计数”的描述。
样本代码:
public class Caller
{
///ctor inections
private static int number= 0;
public async Task publish()
{
await _publishEndpoint.Publish(new GeneratedChipsEvent { messageNumber = ++number}); ;
}
}
public class GeneratedChipsEvent
{
public int messageNumber { get; init; }
}
public class GeneratedChipsEventConsumer : IConsumer<GeneratedChipsEvent>
{
///ctor inections
public async Task Consume(ConsumeContext<GeneratedChipsEvent> context)
{
_logger.LogInformation($"starting consuming the messsage:{context.Message.messageNumber}");
await Task.Delay(System.TimeSpan.FromSeconds(3));
_logger.LogInformation($"ending consuming the messsage:{context.Message.messageNumber}");
} }
登录到我的本地计算机(非群集),等待第一条确认消息,然后开始使用第二条消息:
[10:06:08 INF]开始使用消息:1
[10:06:11 INF]正在结束使用邮件:1
[10:06:11 INF]开始使用邮件:2
[10:06:14 INF]正在结束使用邮件:2
[10:06:14 INF]开始使用邮件:3
[10:06:17 INF]结束使用邮件:3
登录服务器(作为群集),不等待ack第一条消息:
[10:16:33 INF]开始使用邮件:1
[10:16:33 INF]开始使用邮件:2
[10:16:33 INF]开始使用邮件:3
[10:16:36 INF]结束使用邮件:1
[10:16:36 INF]结束使用邮件:2
[10:16:36 INF]正在结束使用消息:3
1条答案
按热度按时间wbgh16ku1#
如果您正在运行服务的三个示例,您将看到一次使用三条消息,每个示例一条。预取计数是针对每个示例的,它不是全局锁。
要一次只激活一个消费者,可以在接收端点配置器上指定:
e.SingleActiveConsumer = true;