wpf 为什么迭代GetConsumingEccurrence()没有完全清空底层阻塞集合

30byixjq  于 2023-10-22  发布在  其他
关注(0)|答案(4)|浏览(130)

我有一个可量化和可重复的问题,使用任务并行库,BlockingCollection<T>ConcurrentQueue<T>GetConsumingEnumerable,同时试图创建一个简单的管道。
简而言之,从一个线程添加条目到默认的BlockingCollection<T>(在后台依赖于ConcurrentQueue<T>),并不能保证它们会从另一个调用GetConsumingEnumerable()方法的线程弹出BlockingCollection<T>
我已经创建了一个非常简单的Winforms应用程序来复制/模拟它,它只是将整数打印到屏幕上。

  • Timer1负责完成工作项目...它使用一个名为_tracker的并发字典,以便知道它已经添加到阻塞集合中的内容。
  • Timer2只是记录BlockingCollection_tracker的计数状态
  • START按钮启动一个Paralell.ForEach,它简单地迭代阻塞集合GetConsumingEnumerable(),并开始将它们打印到第二个列表框。
  • STOP按钮停止Timer1,防止更多条目添加到阻塞集合中。
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

以下是事件的顺序:

  • 按下开始
  • Timer 1 ticks & ListBox 1立即更新3条消息(添加0,1,2)
  • ListBox 2随后用3条消息更新,间隔1秒
  • 正在处理% 0
  • 处理1
  • 处理2
  • Timer 1 ticks & ListBox 1立即更新3条消息(添加3、4、5)
  • ListBox 2有时会更新2条消息,间隔1秒
  • 处理3
  • 处理4
    *处理中5未打印...似乎已经“失踪”了
  • 按下STOP以防止计时器1添加更多消息
  • 等等......“处理5”还是没有出现

您可以看到并发字典仍在跟踪1个尚未处理的条目,该条目随后从_tracker中删除
如果我再次按下开始,那么计时器1开始添加更多的3个条目,并行循环回到生活打印5,6,7和8。

我完全不知道为什么会这样。再次调用start显然会调用一个newtask,它会调用一个Paralell foreach,并重新执行GetConsumingEntrance(),它会神奇地找到丢失的条目。我

为什么BlockingCollection.GetConsumingEnumerable()不能保证覆盖添加到集合中的每个项目。
为什么添加更多的条目会导致它“不卡住”并继续处理?

8qgya5xd

8qgya5xd1#

不能在Parallel.ForEach()中使用GetConsumingEnumerable()
使用TPL extras中的**GetConsumingPartitioner**
在博客文章中,您还将获得为什么不能使用GetConsumingEnumerable()的解释
Parallel.ForEach和PLINQ默认使用的分区算法使用分块来最小化同步成本:它不是在每个元素上获取一次锁,而是获取锁,抓取一组元素(一个块),然后释放锁。
也就是说,Parallel.ForEach在继续之前会等待直到它收到一组工作项。就像你的实验所显示的那样。

ymzxtsji

ymzxtsji2#

从.NET Framework 4.5开始,你可以创建一个分区程序,一次只接受一个项目:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(partitioner, parallelOptions, (batch, state) =>
{
    /* Do stuff */
});

EnumerablePartitionerOptions Enum
NoBuffering:创建一个分区程序,每次从源分区中取出一个项目,并且不使用中间存储,多个线程可以更有效地访问中间存储。此选项提供了对低延迟的支持(项将在源代码中可用时立即处理),并提供了对项之间依赖关系的部分支持(线程不能死锁等待线程本身负责处理的项)。

disho6za

disho6za3#

我不能复制你的行为与简单的控制台应用程序做基本相同的事情(运行在.Net 4.5测试版,这可能会有所不同)。但我认为发生这种情况的原因是Parallel.ForEach()试图通过将输入集合拆分为块来优化执行。使用可扩展对象,只有在向集合中添加更多项之后,才能创建块。有关详细信息,请参阅Custom Partitioners for PLINQ and TPL on MSDN
要解决这个问题,请不要使用Parallel.ForEach()。如果您仍然希望并行处理这些项,则可以在每次迭代中启动Task

3gtaxfhh

3gtaxfhh4#

为了清楚起见,我觉得我应该注意到,在您能够在执行Parallel.foreach之前调用BlockingCollection的.CompleteAdding()方法的情况下,您上面描述的问题将不会成为问题。我已经多次使用这两个对象一起与伟大的成果。
此外,您可以在调用CompleteAdding()后重新设置BlockingCollection,以便在需要时添加更多项(_entries = new BlockingCollection();)
如果您多次单击开始和停止按钮,按如下方式更改上面的单击事件代码将解决缺少条目的问题,并使其按预期工作:

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}

相关问题