我们使用BlockingCollection
在实时应用程序中实现生产者-消费者模式:
BlockingCollection<T> collection = new BlockingCollection<T>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
// Starting up consumer
Task.Run(() => consumer(this.cancellationTokenSource.Token));
…
void Producer(T item)
{
collection.Add(item);
}
…
void consumer()
{
while (true)
{
var item = this.blockingCollection.Take(token);
process (item);
}
}
可以肯定的是,这是实际生产代码的一个非常简化的版本。有时候,当应用程序处于重负载下时,我们会观察到 * 消耗部分落后于 * 生产部分。由于应用程序逻辑非常复杂,它涉及与网络上的其他应用程序以及SQL数据库的交互。许多地方都可能出现延误;它们可能发生在对process()的调用中,这可能在原则上解释了为什么消费部分会很慢。
抛开上述所有考虑,使用BlockingCollection
是否有一些“固有”的东西可以解释这种现象?在.Net中是否有更有效的选择来实现生产者-消费者模式?
2条答案
按热度按时间mspsb9vt1#
首先,BlockingCollection不是生产者/消费者场景的最佳选择。至少有两个更好的选项(数据流,通道),选择取决于实际的应用程序场景-这是从问题中遗漏的。
通过使用async streams和IAsyncEnmerable,也可以创建一个生产者/消费者管道 * 而不需要 * 缓冲区。
异步流
在这种情况下,生产者可以是异步迭代器。消费者将接收IAsyncEnumerable并对其进行迭代直到完成。它还可以生成自己的IAsyncEnumerable输出,该输出可以传递给管道中的下一个方法:
生产者可以是:
消费者:
在这种情况下没有缓冲,生产者不能发出新消息,直到消费者消费当前消息。可以使用Parallel. ForEachAsync对消费者进行并行化。最后,System.Linq.Async提供了对异步流的LINQ操作,允许我们编写例如:
数据流- ActionBlock
Dataflow块可用于构建整个处理流水线,其中每个块从前一个块接收消息(数据),对其进行处理并将其传递到下一个块。大多数块具有输入缓冲器和适当的输出缓冲器。每个块使用一个辅助任务,但可以配置为使用更多。应用程序代码不必处理这些任务。
在最简单的情况下,单个ActionBlock可以处理一个或多个生产者发布到它的消息,充当消费者:
在这个例子中,块使用4个工作者任务,并且如果超过5个项目在其输入缓冲区中等待,超出当前正在处理的项目,则将阻塞。
BufferBlock作为生产者/消费者队列
BufferBlock是一个 inactive 块,它被其他块用作缓冲区。它可以用作异步生产者/消费者集合,如How to: Implement a producer-consumer dataflow pattern所示。在这种情况下,代码需要显式地接收消息。线程由开发人员决定。:
并行消费者
在.NET 6中,使用者可以通过使用
await foreach
和ReceiveAllAsync
来简化:并使用
Parallel.ForEachAsync
并发处理:默认情况下,
Parallel.ForeachAsync
将使用与内核数量相同的工作任务频道
通道类似于Go的通道。它们是专门为生产者/消费者场景构建的,允许在比Dataflow库更低的级别上创建管道。如果数据流库是今天构建的,它将构建在通道之上。
通道不能直接访问,只能通过其Reader或Writer接口访问。这是有意的,并且允许方法的容易流水线化。一个非常常见的模式是生产者方法创建一个它拥有的通道,并只返回一个ChannelReader。消费方法接受该读取器作为输入。通过这种方式,制作者可以控制通道的生存期,而不必担心其他制作者是否正在向其写入。
有了通道,生产者看起来会像这样:
不常见的
.ContinueWith(t=>writer.TryComplete(t.Exception));
用于向writer发送完成信号。这也将提示读者完成。这样,完成就从一个方法传播到下一个方法。也会传播任何异常writer.TryComplete(t.Exception))
不会阻塞或执行任何重要的工作,因此它在哪个线程上执行并不重要。这意味着不需要在worker任务上使用await
,否则会重新抛出任何异常,从而使代码复杂化。消费方法只需要
ChannelReader
作为源。一个方法可以从一个通道读取数据,并使用生产者模式将新数据发布到另一个通道:
8e2ybdfx2#
你可以使用Dataflow library。我不确定它是否比BlockingCollection性能更好。正如其他人所说,没有人能保证你的消费速度比生产速度快,所以总是有可能落后。