.net 如何在ZeroMQ中使用多个线程发布一个套接字

x8goxv8g  于 2023-05-23  发布在  .NET
关注(0)|答案(2)|浏览(126)

我正在使用NetMQ,ZeroMQ的c#库,并按如下方式实现了它。
这里的问题是,在PublishAsync的情况下,多个线程从外部生成和调用数据。
但是,由于发布是在一个名为_pubThread的线程上执行的,因此似乎存在延迟。
请告诉我如何解决这种情况,以及为什么我不能在ZeroMQ的一个套接字中使用多个线程。

public class NetMqManager
{
    private NetMQQueue<(string, string)> _queue = new NetMQQueue<(string, string)>();

    private readonly Thread _subThread;

    private readonly Thread _pubThread;

    private readonly SubscriberSocket _subscriber;

    private readonly PublisherSocket _publisher;

    private readonly ZeroMqEndPoint _endPoint;

    public NetMqManager(ZeroMqEndPoint endPoint)
    {
        _endPoint = endPoint;

        _publisher = new PublisherSocket();
        _publisher.Options.SendHighWatermark = 1500;
        _publisher.SendReady += Publisher_SendReady;
        _pubThread = new Thread(() =>
        {
            var poller = new NetMQPoller { _publisher };
            poller.Run();
        });

        _subscriber = new SubscriberSocket();
        _subscriber.Options.SendHighWatermark = 1500;
        _subscriber.ReceiveReady += Subscriber_ReceiveReady;
        _subThread = new Thread(() =>
        {
            var poller = new NetMQPoller { _subscriber };
            poller.Run();
        });
    }

    public async Task RunAsync()
    {
        await Task.Run(() =>
        {
            _publisher.Bind($"tcp://*:{_endPoint.PubPort}");
            _subscriber.Bind($"tcp://*:{_endPoint.SubPort}");

            _pubThread.Start();
            _subThread.Start();
        });
    }

    public async void PublishAsync(string topic, string payload)
    {
        await Task.Run(() => _queue.Enqueue((topic, payload)));
    }

    public async void SubscribeAsync(string topic)
    {
        _subscriber.Subscribe(topic);
    }

    private void Publisher_SendReady(object? sender, NetMQSocketEventArgs e)
    {
        var (topic, payload) = _queue.Dequeue();
        _publisher.SendMoreFrame(topic).SendFrame(payload);
    }

    private void Subscriber_ReceiveReady(object? sender, NetMQSocketEventArgs e)
    {
        var topic = e.Socket.ReceiveFrameString();
        var payload = e.Socket.ReceiveFrameString();

        Console.WriteLine($"Topic: {topic}, Payload: {payload}");
    }
}
nbysray5

nbysray51#

Q:“请告诉我如何解决这种情况,以及为什么我不能在ZeroMQ中的一个套接字中使用多个线程。"

A部分:"... ZeroMQ中的一个套接字中有多个线程”
ZeroMQ本机API可以指定在Context()示例(核心引擎元素)内使用多少I/O线程,此外还可以在最高级别的细节中指定每个Socket示例与已准备好的Context()示例的I/O线程池之间的关系。
如果您的NetMQ-wrapper将其传递给用户级代码,请相应地使用它来提高I/O性能。
B部分:应避免的性能损失
如果您的用户级代码需要提高线程间性能,请避免使用与使用tcp://-Transport Class 相关的非常昂贵的设置/解码,因为数据仅在同一进程的线程之间流动-inproc://-Transport Class 应该是任何附加开销负载最少的。
C部分:脆弱还是自力更生?
如果您的代码依赖于多帧ZeroMQ消息组合,则会浪费本机PUB/SUB-Archetype 中存在的所有开销处理,其中对所有激活的订阅执行纯左对齐的字节式字符串匹配,因此,依赖于多帧合成是另一个严重浪费资源的来源,也是另一个主要的低效率(虽然这可能是NetMQ Package 器设计妥协的一些不必要的副作用- IIRC这是我在6-8年前遇到的一个案例)。
如果你的代码也努力成为一个健壮的,自我恢复的,一个人永远不应该使用一个盲目的假设来编码,只有公平和诚实的消息传递参与者在房间里。在inproc://的情况下没有这么多,但是如果使用任何“网络开放”* 传输类 *,如{ tcp:// | udp:// | pgm:// | epgm:// | tipc:// | norm:// | vmci:// },当不兼容的消息到达时,应该总是处理情况。在这里,依赖于每个消息中有(总是和只有)两个帧将在第一个空(零帧),单帧或3+帧消息到达时将代码推入死锁或异常。你的.poll()/.recv()-方法必须科普接收循环中实际存在的帧的数量,而不是把自己射到自己的腿上。
最后一点,也是最重要的一点,《零之禅》:
Martin SUSTRIK & Pieter HINTJENS的福音传道从始至终都是清晰而健全的-从不分享,从不阻挡。
虽然许多努力都花在了所谓的线程安全的框架现代化上,但设计准则(恕我直言)仍然倾向于用户级代码,“共享”套接字示例是一件坏事。更好的方法是使用线程间链接(如inproc://-s)将manySocketLessThreads-to-oneSocketOwner移动到这些智能信令/消息传递元平面互连的任何一端,并专注于主要工作逻辑。
更好的性能,
对于关注点分离更好,
更适合调试

pw9qyyiw

pw9qyyiw2#

我非常确定,对于需要发布多个线程的问题,最好的解决办法是

  • 为每个PUB线程提供自己的套接字,并绑定到自己唯一的连接字符串。
  • 在每个SUB客户端中,将SUB套接字多重连接到所有发布者连接字符串。
  • 这样,每个SUBsciber通过单个套接字监听所有PUBLisher线程。每个发布者线程都有自己的专用套接字,因此不必与任何其他发布者线程协调使用它。

这是ZeroMQ套接字的一个经常被遗忘的方面;它们可以被绑定和连接不止一次。参见the manual。人们经常像对待BSD套接字一样对待ZMQ套接字,但它们并不相同!你永远不会想到一个BSD套接字可以通过TCP、inproc、IPC等多种传输方式连接到多个服务器。但它是如此有用的功能!

相关问题