我正在使用NetMQ,ZeroMQ的c#库,并按如下方式实现了它。
这里的问题是,在PublishAsync的情况下,多个线程从外部生成和调用数据。
但是,由于发布是在一个名为_pubThread的线程上执行的,因此似乎存在延迟。
请告诉我如何解决这种情况,以及为什么我不能在ZeroMQ的一个套接字中使用多个线程。
public class NetMqManager
{
private NetMQQueue<(string, string)> _queue = new NetMQQueue<(string, string)>();
private readonly Thread _subThread;
private readonly Thread _pubThread;
private readonly SubscriberSocket _subscriber;
private readonly PublisherSocket _publisher;
private readonly ZeroMqEndPoint _endPoint;
public NetMqManager(ZeroMqEndPoint endPoint)
{
_endPoint = endPoint;
_publisher = new PublisherSocket();
_publisher.Options.SendHighWatermark = 1500;
_publisher.SendReady += Publisher_SendReady;
_pubThread = new Thread(() =>
{
var poller = new NetMQPoller { _publisher };
poller.Run();
});
_subscriber = new SubscriberSocket();
_subscriber.Options.SendHighWatermark = 1500;
_subscriber.ReceiveReady += Subscriber_ReceiveReady;
_subThread = new Thread(() =>
{
var poller = new NetMQPoller { _subscriber };
poller.Run();
});
}
public async Task RunAsync()
{
await Task.Run(() =>
{
_publisher.Bind($"tcp://*:{_endPoint.PubPort}");
_subscriber.Bind($"tcp://*:{_endPoint.SubPort}");
_pubThread.Start();
_subThread.Start();
});
}
public async void PublishAsync(string topic, string payload)
{
await Task.Run(() => _queue.Enqueue((topic, payload)));
}
public async void SubscribeAsync(string topic)
{
_subscriber.Subscribe(topic);
}
private void Publisher_SendReady(object? sender, NetMQSocketEventArgs e)
{
var (topic, payload) = _queue.Dequeue();
_publisher.SendMoreFrame(topic).SendFrame(payload);
}
private void Subscriber_ReceiveReady(object? sender, NetMQSocketEventArgs e)
{
var topic = e.Socket.ReceiveFrameString();
var payload = e.Socket.ReceiveFrameString();
Console.WriteLine($"Topic: {topic}, Payload: {payload}");
}
}
2条答案
按热度按时间nbysray51#
Q:“请告诉我如何解决这种情况,以及为什么我不能在ZeroMQ中的一个套接字中使用多个线程。"
A部分:"... ZeroMQ中的一个套接字中有多个线程”
ZeroMQ本机API可以指定在
Context()
示例(核心引擎元素)内使用多少I/O线程,此外还可以在最高级别的细节中指定每个Socket
示例与已准备好的Context()
示例的I/O线程池之间的关系。如果您的NetMQ-wrapper将其传递给用户级代码,请相应地使用它来提高I/O性能。
B部分:应避免的性能损失
如果您的用户级代码需要提高线程间性能,请避免使用与使用
tcp://
-Transport Class 相关的非常昂贵的设置/解码,因为数据仅在同一进程的线程之间流动-inproc://
-Transport Class 应该是任何附加开销负载最少的。C部分:脆弱还是自力更生?
如果您的代码依赖于多帧ZeroMQ消息组合,则会浪费本机
PUB/SUB
-Archetype 中存在的所有开销处理,其中对所有激活的订阅执行纯左对齐的字节式字符串匹配,因此,依赖于多帧合成是另一个严重浪费资源的来源,也是另一个主要的低效率(虽然这可能是NetMQ Package 器设计妥协的一些不必要的副作用- IIRC这是我在6-8年前遇到的一个案例)。如果你的代码也努力成为一个健壮的,自我恢复的,一个人永远不应该使用一个盲目的假设来编码,只有公平和诚实的消息传递参与者在房间里。在
inproc://
的情况下没有这么多,但是如果使用任何“网络开放”* 传输类 *,如{ tcp:// | udp:// | pgm:// | epgm:// | tipc:// | norm:// | vmci:// }
,当不兼容的消息到达时,应该总是处理情况。在这里,依赖于每个消息中有(总是和只有)两个帧将在第一个空(零帧),单帧或3+帧消息到达时将代码推入死锁或异常。你的.poll()/.recv()
-方法必须科普接收循环中实际存在的帧的数量,而不是把自己射到自己的腿上。最后一点,也是最重要的一点,《零之禅》:
Martin SUSTRIK & Pieter HINTJENS的福音传道从始至终都是清晰而健全的-从不分享,从不阻挡。
虽然许多努力都花在了所谓的线程安全的框架现代化上,但设计准则(恕我直言)仍然倾向于用户级代码,“共享”套接字示例是一件坏事。更好的方法是使用线程间链接(如
inproc://
-s)将manySocketLessThreads-to-oneSocketOwner移动到这些智能信令/消息传递元平面互连的任何一端,并专注于主要工作逻辑。更好的性能,
对于关注点分离更好,
更适合调试
pw9qyyiw2#
我非常确定,对于需要发布多个线程的问题,最好的解决办法是
这是ZeroMQ套接字的一个经常被遗忘的方面;它们可以被绑定和连接不止一次。参见the manual。人们经常像对待BSD套接字一样对待ZMQ套接字,但它们并不相同!你永远不会想到一个BSD套接字可以通过TCP、inproc、IPC等多种传输方式连接到多个服务器。但它是如此有用的功能!