.net 如何批量使用BlockingCollection< T>

z4bn682m  于 2023-11-20  发布在  .NET
关注(0)|答案(3)|浏览(254)

我已经提出了一些代码来消耗队列中的所有等待项,而不是逐个处理这些项,将所有等待项作为一个集合来处理是有意义的。
我已经像这样声明了我的队列。

  1. private BlockingCollection<Item> items =
  2. new BlockingCollection<Item>(new ConcurrentQueue<Item>);

字符串
然后,在一个消费者线程上,我计划像这样批量读取这些项目,

  1. Item nextItem;
  2. while (this.items.TryTake(out nextItem, -1))
  3. {
  4. var workToDo = new List<Item>();
  5. workToDo.Add(nextItem);
  6. while(this.items.TryTake(out nextItem))
  7. {
  8. workToDo.Add(nextItem);
  9. }
  10. // process workToDo, then go back to the queue.
  11. }


这种方法缺乏GetConsumingEnumerable的实用性,我不禁想知道我是否错过了更好的方法,或者我的方法是否有缺陷。
有没有更好的方法来批量消耗BlockingCollection<T>

yhuiod9q

yhuiod9q1#

一个解决方案是使用System.Threading.Tasks.Dataflow中的BufferBlock<T>(包含在.net core 3+中)。它不使用GetConsumingEnumerable(),但它仍然允许您使用相同的实用程序,主要是:

  • 允许多个(对称和/或不对称)消费者和生产者并行处理
  • 线程安全(考虑到上述情况)-无需担心竞态条件
  • 可以通过取消令牌和/或收集完成来取消
  • 消费者阻塞,直到数据可用,避免在轮询上浪费CPU周期

还有一个BatchBlock<T>,但这限制了你固定大小的批次。

  1. var buffer = new BufferBlock<Item>();
  2. while (await buffer.OutputAvailableAsync())
  3. {
  4. if (buffer.TryReceiveAll(out var items))
  5. //process items
  6. }

字符串
下面是一个工作示例,演示了以下内容:

  • 并行处理可变长度批次的多个对称消费者
  • 多个对称的生产者(在这个例子中不是真正的并行操作)
  • 当生产者完成收集时,
  • 为了使示例简短,我没有演示CancellationToken的使用
  • 能够等待生产者和/或消费者完成
  • 能够从不允许JavaScript的区域(如构造函数)进行调用
  • Thread.Sleep()调用不是必需的,但有助于模拟在更繁重的场景中可能发生的一些处理时间
  • Task.WaitAll()Thread.Sleep()都可以选择性地转换为它们的等价物
  • 无需使用任何外部库
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using System.Threading.Tasks.Dataflow;
  7. static class Program
  8. {
  9. static void Main()
  10. {
  11. var buffer = new BufferBlock<string>();
  12. // Kick off consumer task(s)
  13. List<Task> consumers = new List<Task>();
  14. for (int i = 0; i < 3; i++)
  15. {
  16. consumers.Add(Task.Factory.StartNew(async () =>
  17. {
  18. // need to copy this due to lambda variable capture
  19. var num = i;
  20. while (await buffer.OutputAvailableAsync())
  21. {
  22. if (buffer.TryReceiveAll(out var items))
  23. Console.WriteLine($"Consumer {num}: " +
  24. items.Aggregate((a, b) => a + ", " + b));
  25. // real life processing would take some time
  26. await Task.Delay(500);
  27. }
  28. Console.WriteLine($"Consumer {num} complete");
  29. }));
  30. // give consumer tasks time to activate for a better demo
  31. Thread.Sleep(100);
  32. }
  33. // Kick off producer task(s)
  34. List<Task> producers = new List<Task>();
  35. for (int i = 0; i < 3; i++)
  36. {
  37. producers.Add(Task.Factory.StartNew(() =>
  38. {
  39. for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
  40. buffer.Post(j.ToString());
  41. }));
  42. // space out the producers for a better demo
  43. Thread.Sleep(10);
  44. }
  45. // may also use the async equivalent
  46. Task.WaitAll(producers.ToArray());
  47. Console.WriteLine("Finished waiting on producers");
  48. // demo being able to complete the collection
  49. buffer.Complete();
  50. // may also use the async equivalent
  51. Task.WaitAll(consumers.ToArray());
  52. Console.WriteLine("Finished waiting on consumers");
  53. Console.ReadLine();
  54. }
  55. }


这里是一个现代化的和简化的版本的代码。

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using System.Threading.Tasks.Dataflow;
  6. class Program
  7. {
  8. private static async Task Main()
  9. {
  10. var buffer = new BufferBlock<string>();
  11. // Kick off consumer task(s)
  12. var consumers = new List<Task>();
  13. for (var i = 0; i < 3; i++)
  14. {
  15. var id = i;
  16. consumers.Add(Task.Run(() => StartConsumer(id, buffer)));
  17. // give consumer tasks time to activate for a better demo
  18. await Task.Delay(100);
  19. }
  20. // Kick off producer task(s)
  21. var producers = new List<Task>();
  22. for (var i = 0; i < 3; i++)
  23. {
  24. var pid = i;
  25. producers.Add(Task.Run(() => StartProducer(pid, buffer)));
  26. // space out the producers for a better demo
  27. await Task.Delay(10);
  28. }
  29. // may also use the async equivalent
  30. await Task.WhenAll(producers);
  31. Console.WriteLine("Finished waiting on producers");
  32. // demo being able to complete the collection
  33. buffer.Complete();
  34. // may also use the async equivalent
  35. await Task.WhenAll(consumers);
  36. Console.WriteLine("Finished waiting on consumers");
  37. Console.ReadLine();
  38. }
  39. private static async Task StartConsumer(
  40. int id,
  41. IReceivableSourceBlock<string> buffer)
  42. {
  43. while (await buffer.OutputAvailableAsync())
  44. {
  45. if (buffer.TryReceiveAll(out var items))
  46. {
  47. Console.WriteLine($"Consumer {id}: " +
  48. items.Aggregate((a, b) => a + ", " + b));
  49. }
  50. // real life processing would take some time
  51. await Task.Delay(500);
  52. }
  53. Console.WriteLine($"Consumer {id} complete");
  54. }
  55. private static Task StartProducer(int pid, ITargetBlock<string> buffer)
  56. {
  57. for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
  58. {
  59. buffer.Post(j.ToString());
  60. }
  61. return Task.CompletedTask;
  62. }
  63. }

展开查看全部
tyu7yeag

tyu7yeag2#

虽然在某些方面不如ConcurrentQueue<T>好,但我自己的LLQueue<T>允许使用AtomicDequeueAll方法进行批处理出队,在该方法中,(原子和线程安全)操作,然后在一个非线程安全的集合中由单个线程使用。
虽然这不是阻塞,但它可以很容易地用于创建一个阻塞集合:

  1. public BlockingBatchedQueue<T>
  2. {
  3. private readonly AutoResetEvent _are = new AutoResetEvent(false);
  4. private readonly LLQueue<T> _store;
  5. public void Add(T item)
  6. {
  7. _store.Enqueue(item);
  8. _are.Set();
  9. }
  10. public IEnumerable<T> Take()
  11. {
  12. _are.WaitOne();
  13. return _store.AtomicDequeueAll();
  14. }
  15. public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  16. {
  17. if(_are.WaitOne(millisecTimeout))
  18. {
  19. items = _store.AtomicDequeueAll();
  20. return true;
  21. }
  22. items = null;
  23. return false;
  24. }
  25. }

字符串
这是一个不做以下事情的起点:
1.处理一个待处理的等待读卡器。
1.担心多个读取器之间的潜在竞争,这两个读取器都是由一个阅读时发生的写入触发的(它只是认为偶尔的空结果是可以接受的)。
1.将任何上限放在写作上。
所有这些都可以添加,但我想保持最低限度的一些实际用途,希望在上面定义的限制内没有bug。

展开查看全部
c86crjj0

c86crjj03#

不,没有更好的办法了,你的方法基本上是正确的。
为了方便使用,可以将“consume-in-batches”功能 Package 在扩展方法中。下面的实现在整个枚举过程中使用相同的List<T>作为缓冲区,目的是防止在每次迭代时分配新的缓冲区。它还包括一个可选的maxSize参数,用于限制发出的批处理的大小:

  1. /// <summary>
  2. /// Consumes the items in the collection in batches. Each batch contains all
  3. /// the items that are immediately available, up to a specified maximum number.
  4. /// </summary>
  5. public static IEnumerable<T[]> GetConsumingEnumerableBatch<T>(
  6. this BlockingCollection<T> source, int maxSize = -1,
  7. CancellationToken cancellationToken = default)
  8. {
  9. ArgumentNullException.ThrowIfNull(source);
  10. if (maxSize == -1) maxSize = Array.MaxLength;
  11. if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
  12. if (source.IsCompleted) yield break;
  13. List<T> buffer = new();
  14. while (source.TryTake(out T item, Timeout.Infinite, cancellationToken))
  15. {
  16. Debug.Assert(buffer.Count == 0);
  17. buffer.Add(item);
  18. while (buffer.Count < maxSize && source.TryTake(out item))
  19. buffer.Add(item);
  20. T[] batch = buffer.ToArray();
  21. int batchSize = batch.Length;
  22. buffer.Clear();
  23. yield return batch;
  24. if (batchSize < buffer.Capacity / 4)
  25. buffer.Capacity = buffer.Capacity / 2; // Shrink oversized buffer
  26. }
  27. }

字符串
使用示例:

  1. foreach (Item[] batch in this.items.GetConsumingEnumerableBatch())
  2. {
  3. // Process the batch
  4. }


每当发出的批小于缓冲区容量的四分之一时,缓冲区就会缩小一半。这将使缓冲区处于控制之中,以防它在枚举期间的某个时候变得过大。
if (source.IsCompleted) yield break行的目的是复制内置GetConsumingEnumerable方法的行为,当它提供了一个已经取消的令牌,并且集合为空并完成时。
在取消的情况下,没有缓冲的消息有丢失的危险。只有当buffer为空时,cancellationToken才会被检查。
在这个答案的first revision中可以找到一个没有内存管理功能的更简单的实现。

展开查看全部

相关问题