我使用以下设置通过Docker Desktop运行RabbitMQ:
rabbitmq:
container_name: rabbitmq
restart: always
ports:
- "5672:5672"
- "15672:15672"
第二个端口号用于RabbitMQ Jmeter 板。并且,我有一个基本的REST API端点,它应该发布RabbitMQ消息,如下所示:
private readonly IMediator _mediator;
private readonly IPublishEndpoint _publish;
public FlightController(IMediator mediator, IPublishEndpoint publish)
{
_mediator = mediator;
_publish = publish;
}
[HttpPost(Name = "CheckoutCrew")]
[ProducesResponseType((int)HttpStatusCode.Accepted)]
public async Task<IActionResult> CheckoutCrew([FromBody] ScheduleFlightCommand command)
{
var crewIds = new List<string>() { command.SeniorCrewId, command.Crew1Id, command.Crew2Id, command.Crew3Id };
var hasSchedule = true;
var crewCheckoutEvent = new CrewCheckoutEvent() { EmployeeNumbers = crewIds, HasSchedule = hasSchedule };
await _publish.Publish(crewCheckoutEvent);
return Accepted();
}
以下代码代表RabbitMQ的相关配置:
services.AddMassTransit(config => {
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host(Configuration["EventBusSettings:HostAddress"]);
cfg.UseHealthCheck(ctx);
});
});
services.AddMassTransitHostedService();
这条Configuration["EventBusSettings:HostAddress"]
线指向appsettings.json
:
"EventBusSettings": {
"HostAddress": "amqp://guest:guest@localhost:5672"
}
在我运行了我的API(名为Flight.API)之后,我通过DockerDesktop检查RabbitMQ日志,并看到以下内容:
2022-03-31 12:52:41.794701+00:00 [info] <0.1020.0> accepting AMQP connection <0.1020.0> (xxx.xx.x.x:45292 -> xxx.xx.x.x:5672)
2022-03-31 12:52:41.817563+00:00 [info] <0.1020.0> Connection <0.1020.0> (xxx.xx.x.x:45292 -> xxx.xx.x.x:5672) has a client-provided name: Flight.API
2022-03-31 12:52:41.820704+00:00 [info] <0.1020.0> connection <0.1020.0> (xxx.xx.x.x:45292 -> xxx.xx.x.x:5672 - Flight.API): user 'guest' authenticated and granted access to vhost '/'
一切都很好,不是吗?
我也用try...catch
Package 了.Publish
方法,但它也没有抛出任何异常。当我的端点返回Accepted
时,我去检查RabbitMQ Jmeter 板,但它显示Connections: 0
,Channels: 0
等。消息速率部分也处于空闲状态。
我看不出我错过了什么。
(我现在没有消费者,但应该还是能看到一些 * 生命迹象 * 吧?那些连接和频道的计数器不应该在我成功发布了负载之后一直停留在0)
先谢谢你了。
在添加使用者类后编辑
RabbitMQ管理屏幕上仍然没有变化。一切都是默认值,空的,或者空闲的。下面是我对消费者项目的配置:
services.AddMassTransit(config => {
config.AddConsumer<CrewChecoutConsumer>();
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host(Configuration["EventBusSettings:HostAddress"]);
cfg.UseHealthCheck(ctx);
cfg.ReceiveEndpoint(EventBusConstants.CrewCheckoutQueue, config => {
config.ConfigureConsumer<CrewChecoutConsumer>(ctx);
});
});
});
services.AddMassTransitHostedService();
services.AddScoped<CrewChecoutConsumer>();
使用者项目上的appsettings.json
档案会进行相应的变更:
"EventBusSettings": {
"HostAddress": "amqp://guest:guest@localhost:5672"
}
下面是我的完整消费者类:
public class CrewChecoutConsumer : IConsumer<CrewCheckoutEvent>
{
private readonly IMapper _mapper;
private readonly IMediator _mediator;
public CrewChecoutConsumer(IMapper mapper, IMediator mediator)
{
_mapper = mapper;
_mediator = mediator;
}
public async Task Consume(ConsumeContext<CrewCheckoutEvent> context)
{
foreach (var employeeNumber in context.Message.EmployeeNumbers)
{
var query = new GetSingleCrewQuery(employeeNumber);
var crew = await _mediator.Send(query);
crew.HasSchedule = context.Message.HasSchedule;
var updateCrewCommand = new UpdateCrewCommand();
_mapper.Map(crew, updateCrewCommand, typeof(CrewModel), typeof(UpdateCrewCommand));
var result = await _mediator.Send(updateCrewCommand);
}
}
}
1条答案
按热度按时间qacovj5a1#
如果没有任何使用者,则在将消息传递到已发布的消息交换时,您将看到的唯一内容是该交换上的消息速率,但由于没有绑定到该消息类型交换的接收端点(队列)而将其丢弃。
在拥有使用者之前,您不会在任何队列中看到任何消息。
此外,还应将控制器的
CancellationToken
传递给Publish
调用。