我是新手Masstransit,将使用Azure服务总线应用持久未来和路由滑动模式。具有控制器的Web应用程序定义如下:
app.MapPost("/orders/submit", async (SubmitOrderDto submitOrderDto, IRequestClient<SubmitOrder> client, ILogger<Program> logger, CancellationToken cancellationToken) =>
{
try
{
Response response = await client.GetResponse<OrderCompleted, OrderFaulted>(new
{
submitOrderDto.OrderId
}, cancellationToken);
return response switch
{
(_, OrderCompleted completed) => Results.Ok(new
{
completed.OrderId,
completed.Status,
completed.Created,
completed.Completed,
}),
(_, OrderFaulted faulted) => Results.BadRequest(new
{
faulted.OrderId,
faulted.Description,
faulted.Created,
faulted.Faulted
}),
_ => Results.BadRequest()
};
}
catch (Exception ex)
{
return Results.Accepted(value:new
{
submitOrderDto.OrderId,
ex.Message
});
}
})
.Produces<OrderCompleted>()
.Produces<OrderFaulted>(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status202Accepted)
.ProducesValidationProblem(StatusCodes.Status409Conflict)
.WithName("SubmitOrder").WithTags("OrderServiceAPI");
有Future,Planner和Activities类。我希望如果有请求提交,那么它将由Activities类验证,然后将错误返回给客户端。如果通过验证,则可以继续该过程。
但我得到的Masstransit异常如下:
等待响应超时,RequestId:40040000-4446-107b-e828-08db2e658cb0 at MassTransit.Clients.RequestClient 1.GetResponseInternal[T1,T2](SendRequestCallback request, CancellationToken cancellationToken, RequestTimeout timeout, RequestPipeConfiguratorCallback
1 callback)in /_/src/MassTransit/Clients/RequestClient.cs:line 212
如果我切换到使用RabbitMq,请求会按照预期进行验证。但是当与Azure服务总线一起使用时,它总是会出现如上所述的超时异常。在程序中,我注入了如下所示的Masstransit并使用了Masstransit v8包
var builder = WebApplication.CreateBuilder(args);
builder.Services.TryAddScoped<IItineraryPlanner<SubmitOrder>, OrderItineraryPlannerAzure>();
builder.Services.AddMassTransit(x =>
{
// Azure
x.SetKebabCaseEndpointNameFormatter();
x.AddServiceBusMessageScheduler();
x.AddRequestClient<SubmitOrder>();
//-------------
x.ApplyCustomMassTransitConfiguration();
x.AddDelayedMessageScheduler();
x.AddActivitiesFromNamespaceContaining<OrderActivity>();
x.AddFuturesFromNamespaceContaining<OrderFuture>();
x.AddSagaRepository<FutureState>()
.InMemoryRepository();
//x.UsingRabbitMq((context, cfg) =>
//{
// cfg.AutoStart = true;
// cfg.ApplyCustomBusConfiguration();
// cfg.UseDelayedMessageScheduler();
// cfg.ConfigureEndpoints(context);
//});
x.UsingAzureServiceBus((context, cfg) =>
{
using var scope = context.CreateScope();
cfg.Host("Endpoint=sb://{connectionString}");
cfg.UseTimeout(timeoutConfigurator =>
{
timeoutConfigurator.Timeout = TimeSpan.FromMinutes(10);
});
cfg.PrefetchCount = 100;
cfg.LockDuration = TimeSpan.FromMinutes(5);
cfg.MaxConcurrentCalls = 100;
cfg.MaxDeliveryCount = 5;
cfg.DefaultMessageTimeToLive = TimeSpan.FromDays(7);
cfg.AutoStart = true;
cfg.RequiresSession = true;
cfg.UseServiceBusMessageScheduler();
// deploy topology configuration when service starting
cfg.DeployTopologyOnly = true;
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
});
我想我需要为请求客户端创建消费者,但我不知道如何在Futures模式中定义它。或者我错过了任何步骤?
1条答案
按热度按时间mf98qq941#
好吧,你真的过度指定了Azure服务总线的总线配置,这样做会配置一些会破坏的东西。
删除:
此外,在此之上,删除:
因为您已经添加了服务总线消息调度程序。
总线配置所需的一切:
如果你想在你的接收端点上配置任何属性,你需要添加一个回调函数,而不是配置总线端点属性。