MassTransit / RabbitMQ -为什么会跳过这么多消息?

mxg2im7a  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(3)|浏览(398)

我正在MassTransit/RabbitMQ的生产者/消费者场景中使用2个.NET核心控制台应用程序。我需要确保即使没有消费者启动并运行,来自生产者的消息仍能成功排队。这似乎不适用于Publish()-消息消失了,所以我使用了“发送”消息至少会排队,但是如果没有任何消费者运行,消息就会全部进入“_skipped”队列。
这是我的第一个问题这是否是基于需求的正确方法(即使没有使用方启动并运行,来自生成方的消息仍会成功排队)?
使用Send(),我的消费者确实可以工作,但是仍然有许多消息从缝隙中掉出来,被转储到“_skipped”队列中。消费者的逻辑是最小的(只是在那一刻记录消息),所以它不是一个长时间运行的过程。
这就是我的第二个问题为什么仍有这么多消息被转储到“_skipped”队列中?
这就引出了我的第三个问题这是否意味着我的消费者也需要监听“_skipped”队列?
我不确定您需要查看什么代码来回答这个问题,但这里有一个RabbitMQ管理UI的屏幕截图:

生成器配置:

static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      {
                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          {
                              cfg.AddBus(ConfigureBus);
                          });

                          services.AddHostedService<CardMessageProducer>();
                      })
                      .UseConsoleLifetime()
                      .UseSerilog();
    }

    static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            {
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            });

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            {
                EndpointConvention.Map<CardMessage>(e.InputAddress);
            });
        });
    }

生产商代码:

Bus.Send(message);

消费者配置:

static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      {
                          services.AddSingleton<CardMessageConsumer>();

                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          {
                              cfg.AddBus(ConfigureBus);
                          });

                          services.AddHostedService<MassTransitHostedService>();
                      })
                      .UseConsoleLifetime()
                      .UseSerilog();
    }

    static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            {
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            });

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            {
                e.Consumer<CardMessageConsumer>(provider);
            });

            //cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
            //{
            //    e.Consumer<CardMessageConsumer>(provider);
            //});
        });
    }

消费者代码:

class CardMessageConsumer : IConsumer<CardMessage>
{
    private readonly ILogger<CardMessageConsumer> logger;
    private readonly ApplicationConfiguration configuration;
    private long counter;

    public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
    {
        this.logger = logger;
        this.configuration = options.Value;
    }

    public async Task Consume(ConsumeContext<CardMessage> context)
    {
        this.counter++;

        this.logger.LogTrace($"Message #{this.counter} consumed: {context.Message}");
    }
}
zu0ti5jz

zu0ti5jz1#

在MassTransit中,_skipped队列是dead letter queue概念的实现。消息到达那里是因为它们没有被使用。
带有RMQ的MassTransit始终将消息传递到 exchange,而不是 queue。默认情况下,每个MassTransit端点创建(如果没有现有队列)一个具有端点名称的队列,一个具有相同名称的交换,并将它们绑定在一起。(或处理程序),该消息类型的交换(使用消息类型作为交换名称)并且端点交换绑定到消息类型交换。当您使用Publish时,消息被发布到消息类型交换,并使用终结点绑定进行相应的传递当您使用Send时,消息类型交换没有被使用,因此消息直接到达目的交换。every MassTransit端点只期望获得它可以使用的消息。如果它不知道如何使用消息-则将消息移动到死信队列。这与有害消息队列一样,是消息传递的基本模式。
如果您需要将消息排队以便稍后使用,最好的方法是设置连接,但端点本身(我指的是应用程序)不应运行。一旦应用程序启动,它将使用所有排队的消息。

o4tp2gmn

o4tp2gmn2#

当消费者启动总线bus.Start()时,它所做的一件事就是为传输创建所有的交换和队列。如果您要求发布/发送在消费者之前发生,您唯一的选择就是运行DeployTopologyOnly。不幸的是,官方文档中没有介绍此功能,但单元测试在这里:https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/BuildTopology_Specs.cs
当消息发送到不知道如何处理的使用者时,会出现跳过的队列。
例如,如果你有一个消费者,它可以处理IConsumer<MyMessageA>,它在接收端点名“my-queue-a”上。但是你的消息生产者处理Send<MyMessageB>(Uri("my-queue-a")...),这是一个问题。消费者只理解A,它不知道如何处理B。所以它只是把它移到一个跳过的队列中,然后继续。

9cbw7uwe

9cbw7uwe3#

在我的例子中,同一个队列同时监听多个使用者

相关问题