版本
1.公共交通7.0.4
1.公共交通.RabbitMQ 7.0.4
1.公共交通.扩展.依赖注入7.0.4
我已经创建了一个Masstransit发布者和消费者。类似于Masstransit video上的示例。但是消费者没有被调用。
这里是.NET主机的初始设置
namespace RedeliveryTest
{
internal class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<RedeliveryTest.MessageConsumer>(typeof(RedeliveryTest.MessageConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
services.AddHostedService<Worker>();
});
}
}
工人发布合同
namespace RedeliveryTest
{
public class Worker : BackgroundService
{
readonly IBus _bus;
public Worker(IBus bus, ILogger<Worker> logger)
{
_bus = bus;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _bus.Publish(new Message { Text = $"The time is {DateTimeOffset.Now}" }, stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
}
}
消费者代码
namespace RedeliveryTest
{
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer :
IConsumer<Message>
{
readonly ILogger<MessageConsumer> _logger;
public MessageConsumer()
{
}
public Task Consume(ConsumeContext<Message> context)
{
return Task.CompletedTask;
}
}
public class MessageConsumerDefinition :
ConsumerDefinition<MessageConsumer>
{
public MessageConsumerDefinition()
{
EndpointName = $"test-message-queue";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<MessageConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
endpointConfigurator.UseInMemoryOutbox();
}
}
}
在rabbitmq管理页面(http://localhost:15672/)中,我可以看到发布者在“exchanges”标签下是活动的,但是队列“test-message-queue”没有创建。
当我创建第二个控制台应用程序来只托管消费者时,该消费者确实会收到第一个控制台应用程序发布的消息。
namespace ConsuleConsumer
{
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<RedeliveryTest.MessageConsumer>(typeof(RedeliveryTest.MessageConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost");
cfg.ConfigureEndpoints(context);
});
});
var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await bus.StopAsync();
}
}
namespace RedeliveryTest
{
public class Message
{
public string Text { get; set; }
}
public class MessageConsumer :
IConsumer<Message>
{
public MessageConsumer() { }
public Task Consume(ConsumeContext<Message> context)
{
return Task.CompletedTask;
}
}
public class MessageConsumerDefinition :
ConsumerDefinition<MessageConsumer>
{
public MessageConsumerDefinition()
{
EndpointName = $"test-message-queue";
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<MessageConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
endpointConfigurator.UseInMemoryOutbox();
}
}
}
有了第二个控制台应用程序,我现在可以在rabbitmq管理页面中看到队列。
问题为什么当使用者驻留在.NET泛型宿主中时似乎不起作用?
1条答案
按热度按时间6l7fqoea1#
由于您使用的是旧版本的MassTransit,因此需要托管服务来启动总线。请参阅this answer,或总结如下:
之前...
services.AddHostedService<Worker>();
包括:
services.AddMassTransitHostedService();
个请注意,如果使用MassTransit v8,则无需执行此操作。