rabbitmq MassTransit -动态添加使用者时出现配置异常错误

i5desfxk  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(171)

我正在使用MassTransit 8.0.5和RabbitMQ在我的.NET 6微服务中实现服务总线。我从Service-A发布了一条消息,我可以看到由命名空间创建的交换,而没有任何队列。当我启动消费者Service-B时,问题发生了。它抛出了以下配置异常。

以下是我的配置:

public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
    {
        var serviceProvider = services.BuildServiceProvider();

        services.AddMassTransit(configure =>
        {
            configure.SetKebabCaseEndpointNameFormatter();
            configure.AddConsumers(assembly);

            configure.UsingRabbitMq((context, configurator) =>
            {
                var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
                var host = new Uri("rabbitmq://" + rabbitSettings.EventBusConnection);

                configurator.Host(host, h =>
                {
                    h.Username(rabbitSettings.EventBusUserName);
                    h.Password(rabbitSettings.EventBusPassword);
                });

                var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
                    .Where(x => x.BaseType == typeof(IntegrationEvent));

                foreach (var type in types)
                {
                    var consumers = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
                        .Where(x => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(type))).ToList();

                    if (consumers.Any())
                    {
                        // rabbitSettings.QueueName => service-b
                        configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
                            {
                                e.UseConsumeFilter(typeof(InboxFilter<>), context);
                                foreach (var consumer in consumers)
                                {
                                    configurator.ConfigureEndpoints(context, x => x.Exclude(consumer));

                                    var methodInfo = typeof(DependencyInjectionReceiveEndpointExtensions)
                                        .GetMethods()
                                        .Where(x => x.GetParameters()
                                            .Any(p => p.ParameterType == typeof(IServiceProvider)))
                                        .FirstOrDefault(x => x.Name == "Consumer" && x.IsGenericMethod);

                                    var generic = methodInfo?.MakeGenericMethod(consumer);
                                    generic?.Invoke(e, new object[] { e, context, null });
                                }
                            });
                    }
                }
            });
        });

        return services;
    }

从拓扑中排除的IntegrationEvent是我所有集成事件的基本类型。我正在尝试动态添加使用者,但找不到问题所在?
任何帮助都将不胜感激。
编辑:工作溶液

public static IServiceCollection AddCustomMassTransit(this IServiceCollection services)
    {
        var serviceProvider = services.BuildServiceProvider();

        services.AddMassTransit(configure =>
        {
            configure.SetKebabCaseEndpointNameFormatter();

            var allTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes()).ToArray();
            var eventTypes = allTypes.Where(x => x.BaseType == typeof(IntegrationEvent)).ToArray();
            Type[] consumerTypes = allTypes.Where(x => eventTypes.Any(et => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(et)))).ToArray();

            configure.AddConsumers(consumerTypes);

            configure.UsingRabbitMq((context, configurator) =>
            {
                var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
                var host = new Uri("rabbitmq://" + rabbitSettings.EventBusConnection);

                configurator.Host(host, h =>
                {
                    h.Username(rabbitSettings.EventBusUserName);
                    h.Password(rabbitSettings.EventBusPassword);
                });

                configurator.UseCircuitBreaker(cb =>
                {
                    cb.TrackingPeriod = TimeSpan.FromMinutes(1);
                    cb.TripThreshold = 15;
                    cb.ActiveThreshold = 10;
                    cb.ResetInterval = TimeSpan.FromMinutes(5);
                });

                configurator.UseMessageRetry(r =>
                {
                    r.Ignore(typeof(ArgumentException),
                             typeof(ArgumentNullException),
                             typeof(ArgumentOutOfRangeException),
                             typeof(IndexOutOfRangeException),
                             typeof(DivideByZeroException),
                             typeof(InvalidCastException));

                    r.Intervals(new[] { 1, 2, 4, 8, 16 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
                });

                if (consumerTypes.Length > 0)
                {
                    configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
                    {
                        e.UseConsumeFilter(typeof(InboxFilter<>), context);
                        e.ConfigureConsumers(context);
                    });
                }
            });
        });

        return services;
    }
7fhtutme

7fhtutme1#

答案很简单,您不能使用相同的队列名称调用ReceiveEndpoint()两次。
更好的回答是,你的配置一团糟,所有的反射都是不必要的。

public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
{
    var serviceProvider = services.BuildServiceProvider();

    services.AddMassTransit(configure =>
    {
        configure.SetKebabCaseEndpointNameFormatter();

        var allTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes()).ToList();

        var eventTypes = allTypes.Where(x => x.BaseType == typeof(IntegrationEvent)).ToList();

        var consumerTypes = allTypes.Where(x => eventTypes.Any(et => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(et)))).ToList();

        configure.AddConsumers(consumerTypes);

        configure.UsingRabbitMq((context, configurator) =>
        {
            var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
            var host = new Uri("rabbitmq://" + rabbitSettings.EventBusConnection);

            configurator.Host(host, h =>
            {
                h.Username(rabbitSettings.EventBusUserName);
                h.Password(rabbitSettings.EventBusPassword);
            });

            if (consumerTypes.Any())
            {
                // rabbitSettings.QueueName => service-b
                configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
                {
                    e.UseConsumeFilter(typeof(InboxFilter<>), context);
                    e.ConfigureConsumers(context);
                });
            }
        });
    });

    return services;
}

相关问题