RabbitMQ通道创建指南

nr7wwzry  于 2023-05-06  发布在  RabbitMQ
关注(0)|答案(6)|浏览(229)

我正在编写一个简单的类,我的应用程序将使用它来使用RabbitMQ发送和接收消息。我已经阅读了尽可能多的关于RabbitMQ的how-to,博客文章,白色和类似的东西。大多数示例都将连接和通道 Package 在一个using块中,并且与之相矛盾的是,您可能应该将它们实现为单例。具体来说,关于通道,我已经看到评论说,你不应该有一个以上的线程同时使用一个通道。
我正在用C#编写我的库。它是一个单例,在第一次示例化时连接了一个静态连接。
我考虑过为通道做同样的事情,但我打算使用相同的库来允许发布/订阅多个交换/队列。发布和订阅都可以从多个线程完成。
最后我的问题是:如何创建渠道?每条信息?是否每个消费者都有一个唯一的专用通道,发布者是否同步访问单个唯一通道?你明白我的意思。请记住,我打算使用一个服务器,有几十个消费者/发布者,没有更多。

rdlzhqv9

rdlzhqv91#

编辑(2016-1-26):通道不是线程安全的。在AprilMay 2015之间的文档已经发生了变化。新案文:
通道示例不能在线程之间共享。应用程序应该更喜欢每个线程使用一个通道,而不是在多个线程之间共享同一个通道。虽然通道上的某些操作可以安全地并发调用,但有些操作则不然,会导致线路上的帧交错不正确。线程之间共享通道也会干扰 * Publisher Confirm。
从你的问题来看,你似乎没有一个预定义的、固定数量的线程来发布/订阅RabbitMQ(在这种情况下,你可以考虑创建一个通道作为线程初始化的一部分,或者使用ThreadLocal<IModel>)。
如果并发的RabbitMQ操作很少或者消息大小总是很小,您可以简单地在所有RabbitMQ发布/订阅操作周围放置lock(channel)。如果您需要使用任意线程以交错方式传输多个请求(这是通道的首要用途),您可能需要创建一个通道池,例如:a ConcurrentQueue<IModel>,其中将未使用的通道入队,并在需要时将其出列。通道创建开销非常低,从性能测试中我感觉 * 通道创建 * 的过程不涉及任何网络IO,即似乎在客户机第一次使用时,RabbitMQ服务器中会自动创建一个通道。编辑:谢谢庞,There is no need to open a channel per operation and doing so would be very inefficient, since opening a channel is a network roundtrip.

(2016-1-26之前):Java和.net实现的大部分过时的细节:

Re:通道和多线程,由于它对实现的依赖,这有点令人困惑。

Java实现:通道是线程安全的:

通道示例可安全地供多个线程使用。
But
当通道在多个线程之间共享时,未正确处理确认

.net实现Channels are not thread safe

如果有多个线程需要访问特定的IModel示例,则应用程序应自行实施互斥。
IModel操作序列化不正确的症状包括但不限于:
·在线路上发送的无效帧序列
·正在抛出NotSupportedException...
因此,除了Robin的有用答案(无论是否线程安全都适用)之外,在.net实现中,您不能只是共享通道

mlmc2os5

mlmc2os52#

有了ASP.NETCore,就有了可以利用的ObjectPool。创建IPooledObjectPolicy

using Microsoft.Extensions.ObjectPool;  
    using Microsoft.Extensions.Options;  
    using RabbitMQ.Client;  

    public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>  
    {  
        private readonly RabbitOptions _options;  

        private readonly IConnection _connection;  

        public RabbitModelPooledObjectPolicy(IOptions<RabbitOptions> optionsAccs)  
        {  
            _options = optionsAccs.Value;  
            _connection = GetConnection();  
        }  

        private IConnection GetConnection()  
        {  
            var factory = new ConnectionFactory()  
            {  
                HostName = _options.HostName,  
                UserName = _options.UserName,  
                Password = _options.Password,  
                Port = _options.Port,  
                VirtualHost = _options.VHost,  
            };  

            return factory.CreateConnection();  
        }  

        public IModel Create()  
        {  
            return _connection.CreateModel();  
        }  

        public bool Return(IModel obj)  
        {  
            if (obj.IsOpen)  
            {  
                return true;  
            }  
            else  
            {  
                obj?.Dispose();  
                return false;  
            }  
        }  
    }

然后为ObjectPool配置依赖注入

services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton(s =>
{
   var provider = s.GetRequiredService<ObjectPoolProvider>();
   return provider.Create(new RabbitModelPooledObjectPolicy());
});

然后可以注入ObjectPool<IModel>,并使用它

var channel = pool.Get();
try
{
    channel.BasicPublish(...);
}
finally
{
    pool.Return(channel);
}

资料来源:
https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/
https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/

hwamh0ep

hwamh0ep3#

它阐明了aqmp内部结构。目前,我的理解是:
答:我可以从每个应用程序(作为静态共享资源)保持到服务器的单个共享TCP连接
我应该为每个“任务”创建一个通道(一个用于监听队列X,一个用于发布到交换机Y,等等)。假设这些“任务”可以并行执行)
C.或者我可以在一个应用程序中使用一个通道,同时确保对它的访问是同步的-使用一些锁定机制,假设使用(锁定)通道的实际时间跨度相对非常短。

koaltpgm

koaltpgm4#

我不能评论C#实现的细节,但知道Amqp通道被设计为共享单个TCP连接可能会有所帮助,即以实现多路复用。一个通道一次只能发送或接收一条消息,但一个连接可以同时在不同的通道上接收消息。想象一下,你有两个1GB的大文件,你通过Amqp发送给一个消费者,消息可能会被分成10 K块,并以交错的方式发送。你可以在设置连接时控制默认的Amqp消息大小,这与你是否以及何时可能遇到交织有关系; AFAIK此功能旨在帮助防止当多个使用者共享一个连接并且一个使用者接收到大量消息时出现资源不足的情况。
HTH。

gzszwxb4

gzszwxb45#

你是正确的,通道不是线程安全的,不应该被多个线程访问。
如果您计划使用不同的队列,则可以在单个通道中使用多个队列。但是如果你计划使用多个交换机(不知道为什么你需要多个交换机),你将被迫在单例中跟踪多个交换机和通道,或者将这个责任推卸给调用者。
我会构建你的单例,比如它有一个观察者模式,并在那里保存所有RabbitMQ的东西,并引用你的调用者。然后,您还需要马歇尔调用编组到处理此问题的单个线程,否则可能会出现通道对象问题的风险。

sr4lhrrt

sr4lhrrt6#

使用者的并发注意事项库用户需要考虑许多与并发相关的主题。应避免多个线程同时使用IModel示例。应用程序代码应该为IModel示例维护一个清晰的线程所有权概念。这是对出版商的一个硬性要求:共享用于并发发布的信道(IModel示例)将导致协议级的不正确的帧交织。通道示例不能被在其上发布的线程共享
RabbitMQ消费者并发注意事项

相关问题