.net BlockingCollection替代方案?

ctrmrzij  于 2023-06-07  发布在  .NET
关注(0)|答案(2)|浏览(104)

我们使用BlockingCollection在实时应用程序中实现生产者-消费者模式:

BlockingCollection<T> collection = new BlockingCollection<T>();
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

// Starting up consumer
Task.Run(() => consumer(this.cancellationTokenSource.Token));
…
void Producer(T item)
{
    collection.Add(item);
}
…
void consumer()
{
    while (true)
    {
           var item = this.blockingCollection.Take(token);
           process (item);
    }
}

可以肯定的是,这是实际生产代码的一个非常简化的版本。有时候,当应用程序处于重负载下时,我们会观察到 * 消耗部分落后于 * 生产部分。由于应用程序逻辑非常复杂,它涉及与网络上的其他应用程序以及SQL数据库的交互。许多地方都可能出现延误;它们可能发生在对process()的调用中,这可能在原则上解释了为什么消费部分会很慢。
抛开上述所有考虑,使用BlockingCollection是否有一些“固有”的东西可以解释这种现象?在.Net中是否有更有效的选择来实现生产者-消费者模式?

mspsb9vt

mspsb9vt1#

首先,BlockingCollection不是生产者/消费者场景的最佳选择。至少有两个更好的选项(数据流,通道),选择取决于实际的应用程序场景-这是从问题中遗漏的。
通过使用async streams和IAsyncEnmerable,也可以创建一个生产者/消费者管道 * 而不需要 * 缓冲区。

异步流

在这种情况下,生产者可以是异步迭代器。消费者将接收IAsyncEnumerable并对其进行迭代直到完成。它还可以生成自己的IAsyncEnumerable输出,该输出可以传递给管道中的下一个方法:
生产者可以是:

public static async IAsyncEnumerable<Message> ProducerAsync(CancellationToken token)
{
    while(!token.IsCancellationRequested)
    {
        var msg=await Task.Run(()=>SomeHeavyWork());
        yield return msg;
    }
}

消费者:

async Task ConsumeAsync(IAsyncEnumerable<Message> source)
{
    await foreach(var msg in source)
    {
        await consumeMessage(msg);
    }
}

在这种情况下没有缓冲,生产者不能发出新消息,直到消费者消费当前消息。可以使用Parallel. ForEachAsync对消费者进行并行化。最后,System.Linq.Async提供了对异步流的LINQ操作,允许我们编写例如:

List<OtherMsg> results=await ProducerAsync(cts.Token)
                                 .Select(msg=>consumeAndReturn(msg))
                                 .ToListAsync();

数据流- ActionBlock

Dataflow块可用于构建整个处理流水线,其中每个块从前一个块接收消息(数据),对其进行处理并将其传递到下一个块。大多数块具有输入缓冲器和适当的输出缓冲器。每个块使用一个辅助任务,但可以配置为使用更多。应用程序代码不必处理这些任务。
在最简单的情况下,单个ActionBlock可以处理一个或多个生产者发布到它的消息,充当消费者:

async Task ConsumeAsync<Message>(Message message)
{
    //Do something with the message
}

...
ExecutionDataflowBlockOptions _options= new () {
    MaxDegreeOfParallelism=4,
    BoundedCapacity=5
};

ActionBlock<Message> _block=new ActionBlock(ConsumeAsync,_options);

async Task ProduceAsync(CancellationToken token)
{
    while(!token.IsCancellationRequested)
    {
        var msg=await produceNewMessageAsync();
        await _block.SendAsync(msg);
    }
   _block.Complete();
   await _block.Completion;
}

在这个例子中,块使用4个工作者任务,并且如果超过5个项目在其输入缓冲区中等待,超出当前正在处理的项目,则将阻塞。

BufferBlock作为生产者/消费者队列

BufferBlock是一个 inactive 块,它被其他块用作缓冲区。它可以用作异步生产者/消费者集合,如How to: Implement a producer-consumer dataflow pattern所示。在这种情况下,代码需要显式地接收消息。线程由开发人员决定。:

static void Produce(ITargetBlock<byte[]> target)
{
    var rand = new Random();

    for (int i = 0; i < 100; ++ i)
    {
        var buffer = new byte[1024];
        rand.NextBytes(buffer);
        target.Post(buffer);
    }

    target.Complete();
}

static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
    int bytesProcessed = 0;

    while (await source.OutputAvailableAsync())
    {
        byte[] data = await source.ReceiveAsync();
        bytesProcessed += data.Length;
    }

    return bytesProcessed;
}

static async Task Main()
{
    var buffer = new BufferBlock<byte[]>();
    var consumerTask = ConsumeAsync(buffer);
    Produce(buffer);

    var bytesProcessed = await consumerTask;

    Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}

并行消费者

在.NET 6中,使用者可以通过使用await foreachReceiveAllAsync来简化:

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;

    await foreach(var data in source.ReceiveAllAsync())
    {
        bytesProcessed += data.Length;
    }

    return bytesProcessed;
}

并使用Parallel.ForEachAsync并发处理:

static async Task ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    var msgs=source.ReceiveAllAsync();

    await Parallel.ForEachAsync(msgs,
        new ParallelOptions { MaxDegreeOfParallelism = 4},
        msg=>ConsumeMsgAsync(msg));

}

默认情况下,Parallel.ForeachAsync将使用与内核数量相同的工作任务

频道

通道类似于Go的通道。它们是专门为生产者/消费者场景构建的,允许在比Dataflow库更低的级别上创建管道。如果数据流库是今天构建的,它将构建在通道之上。
通道不能直接访问,只能通过其Reader或Writer接口访问。这是有意的,并且允许方法的容易流水线化。一个非常常见的模式是生产者方法创建一个它拥有的通道,并只返回一个ChannelReader。消费方法接受该读取器作为输入。通过这种方式,制作者可以控制通道的生存期,而不必担心其他制作者是否正在向其写入。
有了通道,生产者看起来会像这样:

ChannelReader<Message> Producer(CancellationToken token)
{
    var channel=Channel.CreateBounded(5);
    var writer=channel.Writer;

    _ = Task.Run(()=>{
        while(!token.IsCancellationRequested)
        {
           ...
           await writer.SendAsync(msg);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

不常见的.ContinueWith(t=>writer.TryComplete(t.Exception));用于向writer发送完成信号。这也将提示读者完成。这样,完成就从一个方法传播到下一个方法。也会传播任何异常
writer.TryComplete(t.Exception))不会阻塞或执行任何重要的工作,因此它在哪个线程上执行并不重要。这意味着不需要在worker任务上使用await,否则会重新抛出任何异常,从而使代码复杂化。
消费方法只需要ChannelReader作为源。

async Task ConsumerAsync(ChannelReader<Message> source)
{
    await Parallel.ForEachAsync(source.ReadAllAsync(),
        new ParallelOptions { MaxDegreeOfParallelism = 4},
        msg=>consumeMessageAsync(msg)
    );
}

一个方法可以从一个通道读取数据,并使用生产者模式将新数据发布到另一个通道:

ChannelReader<OtherMessage> ConsumerAsync(ChannelReader<Message> source)
{
    var channel=Channel.CreateBounded<OtherMessage>();
    var writer=channel.Writer;

    await Parallel.ForEachAsync(source.ReadAllAsync(),
        new ParallelOptions { MaxDegreeOfParallelism = 4},
        async msg=>{
            var newMsg=await consumeMessageAsync(msg);
            await writer.SendAsync(newMsg);
        })
       .ContinueWith(t=>writer.TryComplete(t.Exception));
}
8e2ybdfx

8e2ybdfx2#

你可以使用Dataflow library。我不确定它是否比BlockingCollection性能更好。正如其他人所说,没有人能保证你的消费速度比生产速度快,所以总是有可能落后。

相关问题