rabbitmq C# MassTransit -在运行时应用程序中创建新队列后使用消息引发System.ObjectDisposedException异常

2wnc66cl  于 12个月前  发布在  RabbitMQ
关注(0)|答案(2)|浏览(235)

我尝试在应用程序运行时创建新队列后使用新消息。
1.在我的应用中创建新项目时,服务发布新ItemCreatedMessage

  1. ItemCreatedMessageCreateItemQueuesConsumer消费者消费
    2.1.在CreateItemQueuesConsumer中,我将ConnectReceiveEndpoint配置为设置队列名称,并使用第二个参数IReceiveEndpointConfigurator将消费者与消费者工厂方法连接起来
    1.我的队列成功创建,绑定到我的交易所,我可以在RabbitMQ控制台中看到它们
    1.但是,当我将新消息发布到我的队列时,该消息将由我在工厂方法中配置的消费者使用,消费者抛出System.ObjectDisposedException
    这是我的以下代码:
// CreateMachineQueuesConsumer.cs

public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
    {
        var message = context.Message;
        var machineId = message.MachineId;
        var cancellationToken = context.CancellationToken;

        using var provider = this.serviceProvider.CreateScope();

        var createMachineStatusQueue = this.bus.ConnectReceiveEndpoint(
            string.Format("update_machine_status__{0}", machineId),
            x =>
            {
                if (x is IRabbitMqReceiveEndpointConfigurator c)
                {
                    c.ConfigureConsumeTopology = false;
                    c.ConcurrentMessageLimit = 1;
                    c.PrefetchCount = 1;

                    c.Consumer(() =>
                    {
                        return provider.ServiceProvider
                            .GetRequiredService<UpdateMachineStatusConsumer>();
                    });

                    c.Bind("machine_status_cycle", s =>
                    {
                        s.RoutingKey = machineId.ToString();
                        s.ExchangeType = ExchangeType.Direct;
                    });
                }
            });

            var anotherQueues = ...

字符串
当我重新加载我的应用程序时,消息会被使用,因为我有一个后台服务,它会在我的应用程序构建时创建新队列。

jdzmm42g

jdzmm42g1#

要连接接收端点,您应该在消费者中使用IReceiveEndpointConnector

public class ThisConsumer :
    IConsumer<MachineCreatedMessage>
{
    public ThisConsumer(IReceiveEndpointConnector connector)
    {
        _connector = connector;
    }

    public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
    {
        var message = context.Message;
        var machineId = message.MachineId;
        var cancellationToken = context.CancellationToken;

        var queueHandle = _connector.ConnectReceiveEndpoint(string.Format("update_machine_status__{0}", machineId), 
            (context, cfg) =>
            {
                cfg.ConfigureConsumeTopology = false;
                cfg.ConcurrentMessageLimit = 1;
                cfg.PrefetchCount = 1;

                cfg.ConfigureConsumer<UpdateMachineStatusConsumer(context);

                if (cfg is IRabbitMqReceiveEndpointConfigurator c)
                {
                    c.Bind("machine_status_cycle", s =>
                    {
                        s.RoutingKey = machineId.ToString();
                        s.ExchangeType = ExchangeType.Direct;
                    });
                }
            });

        // optional, don't have to wait
        await queueHandle.Ready;
    }
}

字符串
此外,在AddMassTransit中,请确保同时调用:
x.AddConsumer<ThisConsumer>();x.AddConsumer<UpdateMachineStatusConsumer>();
这在文档中有所涉及。

jslywgbw

jslywgbw2#

如果我不得不从你提供的代码中猜测,嫌疑人应该是这样的代码:

c.Consumer(() =>
                {
                    return provider.ServiceProvider
                        .GetRequiredService<UpdateMachineStatusConsumer>();
                });

字符串
在lambda x =>的作用域中,这反过来又引用了一个在两者作用域之外的一次性变量。尝试将“provider”服务定位器移动到访问它的作用域中:

x =>
        {
            if (x is IRabbitMqReceiveEndpointConfigurator c)
            {
                using var provider = this.serviceProvider.CreateScope(); // <- scoped within when this code is actually executed.
                c.ConfigureConsumeTopology = false;
                c.ConcurrentMessageLimit = 1;
                c.PrefetchCount = 1;

                c.Consumer(() =>
                {
                    return provider.ServiceProvider
                        .GetRequiredService<UpdateMachineStatusConsumer>();
                });

                c.Bind("machine_status_cycle", s =>
                {
                    s.RoutingKey = machineId.ToString();
                    s.ExchangeType = ExchangeType.Direct;
                });
            }
        });

相关问题