我正在使用MassTransit 8.0.5和RabbitMQ在我的.NET 6微服务中实现服务总线。我从Service-A发布了一条消息,我可以看到由命名空间创建的交换,而没有任何队列。当我启动消费者Service-B时,问题发生了。它抛出了以下配置异常。
以下是我的配置:
public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
{
var serviceProvider = services.BuildServiceProvider();
services.AddMassTransit(configure =>
{
configure.SetKebabCaseEndpointNameFormatter();
configure.AddConsumers(assembly);
configure.UsingRabbitMq((context, configurator) =>
{
var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
var host = new Uri("rabbitmq://" + rabbitSettings.EventBusConnection);
configurator.Host(host, h =>
{
h.Username(rabbitSettings.EventBusUserName);
h.Password(rabbitSettings.EventBusPassword);
});
var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => x.BaseType == typeof(IntegrationEvent));
foreach (var type in types)
{
var consumers = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(type))).ToList();
if (consumers.Any())
{
// rabbitSettings.QueueName => service-b
configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
{
e.UseConsumeFilter(typeof(InboxFilter<>), context);
foreach (var consumer in consumers)
{
configurator.ConfigureEndpoints(context, x => x.Exclude(consumer));
var methodInfo = typeof(DependencyInjectionReceiveEndpointExtensions)
.GetMethods()
.Where(x => x.GetParameters()
.Any(p => p.ParameterType == typeof(IServiceProvider)))
.FirstOrDefault(x => x.Name == "Consumer" && x.IsGenericMethod);
var generic = methodInfo?.MakeGenericMethod(consumer);
generic?.Invoke(e, new object[] { e, context, null });
}
});
}
}
});
});
return services;
}
从拓扑中排除的IntegrationEvent是我所有集成事件的基本类型。我正在尝试动态添加使用者,但找不到问题所在?
任何帮助都将不胜感激。
编辑:工作溶液
public static IServiceCollection AddCustomMassTransit(this IServiceCollection services)
{
var serviceProvider = services.BuildServiceProvider();
services.AddMassTransit(configure =>
{
configure.SetKebabCaseEndpointNameFormatter();
var allTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes()).ToArray();
var eventTypes = allTypes.Where(x => x.BaseType == typeof(IntegrationEvent)).ToArray();
Type[] consumerTypes = allTypes.Where(x => eventTypes.Any(et => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(et)))).ToArray();
configure.AddConsumers(consumerTypes);
configure.UsingRabbitMq((context, configurator) =>
{
var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
var host = new Uri("rabbitmq://" + rabbitSettings.EventBusConnection);
configurator.Host(host, h =>
{
h.Username(rabbitSettings.EventBusUserName);
h.Password(rabbitSettings.EventBusPassword);
});
configurator.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15;
cb.ActiveThreshold = 10;
cb.ResetInterval = TimeSpan.FromMinutes(5);
});
configurator.UseMessageRetry(r =>
{
r.Ignore(typeof(ArgumentException),
typeof(ArgumentNullException),
typeof(ArgumentOutOfRangeException),
typeof(IndexOutOfRangeException),
typeof(DivideByZeroException),
typeof(InvalidCastException));
r.Intervals(new[] { 1, 2, 4, 8, 16 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
});
if (consumerTypes.Length > 0)
{
configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
{
e.UseConsumeFilter(typeof(InboxFilter<>), context);
e.ConfigureConsumers(context);
});
}
});
});
return services;
}
1条答案
按热度按时间7fhtutme1#
答案很简单,您不能使用相同的队列名称调用
ReceiveEndpoint()
两次。更好的回答是,你的配置一团糟,所有的反射都是不必要的。