我正在MassTransit/RabbitMQ的生产者/消费者场景中使用2个.NET核心控制台应用程序。我需要确保即使没有消费者启动并运行,来自生产者的消息仍能成功排队。这似乎不适用于Publish()-消息消失了,所以我使用了“发送”消息至少会排队,但是如果没有任何消费者运行,消息就会全部进入“_skipped”队列。
这是我的第一个问题这是否是基于需求的正确方法(即使没有使用方启动并运行,来自生成方的消息仍会成功排队)?
使用Send(),我的消费者确实可以工作,但是仍然有许多消息从缝隙中掉出来,被转储到“_skipped”队列中。消费者的逻辑是最小的(只是在那一刻记录消息),所以它不是一个长时间运行的过程。
这就是我的第二个问题为什么仍有这么多消息被转储到“_skipped”队列中?
这就引出了我的第三个问题这是否意味着我的消费者也需要监听“_skipped”队列?
我不确定您需要查看什么代码来回答这个问题,但这里有一个RabbitMQ管理UI的屏幕截图:
生成器配置:
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
});
services.AddHostedService<CardMessageProducer>();
})
.UseConsoleLifetime()
.UseSerilog();
}
static IBusControl ConfigureBus(IServiceProvider provider)
{
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
{
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
});
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
{
EndpointConvention.Map<CardMessage>(e.InputAddress);
});
});
}
生产商代码:
Bus.Send(message);
消费者配置:
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<CardMessageConsumer>();
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
});
services.AddHostedService<MassTransitHostedService>();
})
.UseConsoleLifetime()
.UseSerilog();
}
static IBusControl ConfigureBus(IServiceProvider provider)
{
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
{
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
});
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
{
e.Consumer<CardMessageConsumer>(provider);
});
//cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
//{
// e.Consumer<CardMessageConsumer>(provider);
//});
});
}
消费者代码:
class CardMessageConsumer : IConsumer<CardMessage>
{
private readonly ILogger<CardMessageConsumer> logger;
private readonly ApplicationConfiguration configuration;
private long counter;
public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
{
this.logger = logger;
this.configuration = options.Value;
}
public async Task Consume(ConsumeContext<CardMessage> context)
{
this.counter++;
this.logger.LogTrace($"Message #{this.counter} consumed: {context.Message}");
}
}
3条答案
按热度按时间zu0ti5jz1#
在MassTransit中,
_skipped
队列是dead letter queue概念的实现。消息到达那里是因为它们没有被使用。带有RMQ的MassTransit始终将消息传递到 exchange,而不是 queue。默认情况下,每个MassTransit端点创建(如果没有现有队列)一个具有端点名称的队列,一个具有相同名称的交换,并将它们绑定在一起。(或处理程序),该消息类型的交换(使用消息类型作为交换名称)并且端点交换绑定到消息类型交换。当您使用
Publish
时,消息被发布到消息类型交换,并使用终结点绑定进行相应的传递当您使用Send
时,消息类型交换没有被使用,因此消息直接到达目的交换。every MassTransit端点只期望获得它可以使用的消息。如果它不知道如何使用消息-则将消息移动到死信队列。这与有害消息队列一样,是消息传递的基本模式。如果您需要将消息排队以便稍后使用,最好的方法是设置连接,但端点本身(我指的是应用程序)不应运行。一旦应用程序启动,它将使用所有排队的消息。
o4tp2gmn2#
当消费者启动总线
bus.Start()
时,它所做的一件事就是为传输创建所有的交换和队列。如果您要求发布/发送在消费者之前发生,您唯一的选择就是运行DeployTopologyOnly。不幸的是,官方文档中没有介绍此功能,但单元测试在这里:https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/BuildTopology_Specs.cs当消息发送到不知道如何处理的使用者时,会出现跳过的队列。
例如,如果你有一个消费者,它可以处理
IConsumer<MyMessageA>
,它在接收端点名“my-queue-a”上。但是你的消息生产者处理Send<MyMessageB>(Uri("my-queue-a")...)
,这是一个问题。消费者只理解A,它不知道如何处理B。所以它只是把它移到一个跳过的队列中,然后继续。9cbw7uwe3#
在我的例子中,同一个队列同时监听多个使用者