与Azure服务总线一起使用时获取MassTransit.RequestTimeoutException

klr1opcd  于 2023-03-31  发布在  其他
关注(0)|答案(1)|浏览(128)

我是新手Masstransit,将使用Azure服务总线应用持久未来和路由滑动模式。具有控制器的Web应用程序定义如下:

app.MapPost("/orders/submit", async (SubmitOrderDto submitOrderDto, IRequestClient<SubmitOrder> client, ILogger<Program> logger, CancellationToken cancellationToken) =>
            {
                try
                {
                    Response response = await client.GetResponse<OrderCompleted, OrderFaulted>(new
                    {
                        submitOrderDto.OrderId
                    }, cancellationToken);
                    return response switch
                    {
                        (_, OrderCompleted completed) => Results.Ok(new
                        {
                            completed.OrderId,
                            completed.Status,
                            completed.Created,
                            completed.Completed,
                        }),
                        (_, OrderFaulted faulted) => Results.BadRequest(new
                        {
                            faulted.OrderId,
                            faulted.Description,
                            faulted.Created,
                            faulted.Faulted
                        }),
                        _ => Results.BadRequest()
                    };
                }
                catch (Exception ex)
                {
                    return Results.Accepted(value:new
                    {
                        submitOrderDto.OrderId, 
                        ex.Message
                    });
                }
            })
            .Produces<OrderCompleted>()
            .Produces<OrderFaulted>(StatusCodes.Status400BadRequest)
            .Produces(StatusCodes.Status202Accepted)
            .ProducesValidationProblem(StatusCodes.Status409Conflict)
            .WithName("SubmitOrder").WithTags("OrderServiceAPI");

有Future,Planner和Activities类。我希望如果有请求提交,那么它将由Activities类验证,然后将错误返回给客户端。如果通过验证,则可以继续该过程。
但我得到的Masstransit异常如下:
等待响应超时,RequestId:40040000-4446-107b-e828-08db2e658cb0 at MassTransit.Clients.RequestClient 1.GetResponseInternal[T1,T2](SendRequestCallback request, CancellationToken cancellationToken, RequestTimeout timeout, RequestPipeConfiguratorCallback 1 callback)in /_/src/MassTransit/Clients/RequestClient.cs:line 212
如果我切换到使用RabbitMq,请求会按照预期进行验证。但是当与Azure服务总线一起使用时,它总是会出现如上所述的超时异常。在程序中,我注入了如下所示的Masstransit并使用了Masstransit v8包

var builder = WebApplication.CreateBuilder(args);
builder.Services.TryAddScoped<IItineraryPlanner<SubmitOrder>, OrderItineraryPlannerAzure>();

builder.Services.AddMassTransit(x =>
{

 // Azure
    x.SetKebabCaseEndpointNameFormatter();
    x.AddServiceBusMessageScheduler();
    x.AddRequestClient<SubmitOrder>();

    //-------------
    x.ApplyCustomMassTransitConfiguration();
    x.AddDelayedMessageScheduler();
    x.AddActivitiesFromNamespaceContaining<OrderActivity>();
    x.AddFuturesFromNamespaceContaining<OrderFuture>();

    x.AddSagaRepository<FutureState>()
        .InMemoryRepository();

    //x.UsingRabbitMq((context, cfg) =>
    //{
    //    cfg.AutoStart = true;
    //    cfg.ApplyCustomBusConfiguration();

    //    cfg.UseDelayedMessageScheduler();
    //    cfg.ConfigureEndpoints(context);
    //});

    x.UsingAzureServiceBus((context, cfg) =>
    {
        using var scope = context.CreateScope();  
 cfg.Host("Endpoint=sb://{connectionString}");

        cfg.UseTimeout(timeoutConfigurator =>
        {
            timeoutConfigurator.Timeout = TimeSpan.FromMinutes(10);
        });
        cfg.PrefetchCount = 100;
        cfg.LockDuration = TimeSpan.FromMinutes(5);
        cfg.MaxConcurrentCalls = 100;
        cfg.MaxDeliveryCount = 5;
        cfg.DefaultMessageTimeToLive = TimeSpan.FromDays(7);
        cfg.AutoStart = true;
        cfg.RequiresSession = true;

        cfg.UseServiceBusMessageScheduler();
        // deploy topology configuration when service starting
        cfg.DeployTopologyOnly = true;
        cfg.UseDelayedMessageScheduler();
        cfg.ConfigureEndpoints(context);
    });

我想我需要为请求客户端创建消费者,但我不知道如何在Futures模式中定义它。或者我错过了任何步骤?

mf98qq94

mf98qq941#

好吧,你真的过度指定了Azure服务总线的总线配置,这样做会配置一些会破坏的东西。
删除:

cfg.RequiresSession = true;
cfg.DeployTopologyOnly = true;
cfg.UseDelayedMessageScheduler();

此外,在此之上,删除:

x.AddDelayedMessageScheduler();

因为您已经添加了服务总线消息调度程序。
总线配置所需的一切:

x.UsingAzureServiceBus((context, cfg) =>
{
    cfg.Host("Endpoint=sb://{connectionString}");

    cfg.UseServiceBusMessageScheduler();

    cfg.UseTimeout(timeoutConfigurator =>
    {
        timeoutConfigurator.Timeout = TimeSpan.FromMinutes(10);
    });

    cfg.PrefetchCount = 100;
    cfg.ConcurrentMessageLimit = 100;
    cfg.AutoStart = true;

    cfg.ConfigureEndpoints(context);
});

如果你想在你的接收端点上配置任何属性,你需要添加一个回调函数,而不是配置总线端点属性。

x.AddConfigureEndpointsCallback((context,name,cfg) =>
{
    if (cfg is IServiceBusReceiveEndpointConfigurator sb)
    {
        sb.LockDuration = TimeSpan.FromMinutes(5);
        sb.MaxDeliveryCount = 5;
        sb.DefaultMessageTimeToLive = TimeSpan.FromDays(7);
    }
});

相关问题