C++中的线程安全队列

gstyhher  于 2023-05-20  发布在  其他
关注(0)|答案(9)|浏览(225)

C中是否有线程安全、非阻塞的队列类?
可能是一个基本的问题,但我没有做C
很长一段时间。

**编辑:**删除STL要求。

2eafrhcq

2eafrhcq1#

假设你的CPU有一个双指针宽的比较和交换(486或更高版本上的compxchg8b,大多数amd64机器上的compxchg16b [英特尔的一些早期型号上不存在])...有一个算法here
更新:如果你不害怕做一些工作,把它翻译成C++并不难。:P
这个算法假设一个“带标签的指针”结构,看起来像这样:

// Be aware that copying this structure has to be done atomically...

template <class T>
struct pointer
{
   T *ptr;
   uintptr_t tag;
};

然后你想 Package 指令lock cmpxchg{8|16}b与一些inline asm...
也许你可以这样写队列节点:

template <class T>
struct queue_node
{
    T value;
    pointer<queue_node<T> > next;
};

剩下的或多或少是我连接的算法的转录...

alen0pnh

alen0pnh2#

由于当前的C++标准甚至不承认线程的存在,因此在STL或标准库的任何其他部分中几乎都没有线程安全的东西。

3qpi33ja

3qpi33ja4#

你需要自己实现它,或者使用一个实现它的库。要自己做,你可能想看看这个:
Implementing a Thread-Safe Queue using Condition Variables

eblbsuwk

eblbsuwk5#

现在可能太迟了。作为将来的参考,这是无锁队列的一个很好的实现(内置线程安全性,但有一些警告)。
多生产者-多消费者
http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
https://github.com/cameron314/concurrentqueue
单一生产者-单一消费者
http://moodycamel.com/blog/2013/a-fast-lock-free-queue-for-c++
https://github.com/cameron314/readerwriterqueue

dffbzjpn

dffbzjpn6#

简短的回答-没有。STL本身并不关心并发性(至少在规范级别上是这样)。当前的C++标准对线程没有任何规定。
你可以很容易地在STL和Boost之上构建这样一个队列--只需要在你的自定义类中 Package std::queueboost::mutex

pbossiut

pbossiut7#

STL容器不是线程安全的,你应该实现并发访问的处理。
有一个项目(C++)旨在提供并发访问:CPH STL
paper about

beq87vna

beq87vna8#

目前非官方的Boost.Lockfree是值得考虑的。我同时使用FIFO和原子类型。

v1l68za4

v1l68za49#

非阻塞意味着不需要任何互斥体、信号量、临界区等。
对于一个生产者和一个消费者,也就是一个线程推送而另一个弹出元素,在我的“强烈”意见中,下面提供的循环队列就足够了。
下面是代码以及关于其他可能性的注解

// --------- class: TNQueue
  // 
  // Implements a fixed size circular queue which is non-blocking and thread safe provided that
  //
  //    - there is only one producer, that is one thread that pushes into the queue
  //    - and only one consumer, a thread that pops the queue
  //    - producer has to handle fullness of the queue (queue will not block)
  //    - consumer has to handle emptiness of the queue (queue will not block)
  //
  // Safety: The queue is thread safe if the given conditions are met because the two separate threads changes 
  //         different pointers (push writeIndx and pop readIndx). Although they read both indexes in functions isFull()
  //         and isEmpty(), the evaluation is "conservative", that is, in worst case works well.
  // 
  // Counting: Counting elements is not possible in a not blocking and still thread safe way. For example
  //           a counter as member variable would have to be changed by both, push and pop. And a function
  //           computing the count using writeIndx and readIndx is not achievable, mainly because of the circular 
  //           nature of the queue: the function would need to use module (%) operation which may provide a nonsense 
  //           value if some of the counters change during the computation.
  //
  // Blocking with a counting semaphore: If blocking is required, that is the producer gets blocked when the queue is full
  //          and the consumer when the queue is empty. Then it can be implemented efficiently using a counting semaphore
  //          that reflects the number of elements in the queue. Then push will increase the semaphore (release) and the pop
  //          decrement it (wait). This solution provides the counter of elements (from the semaphore) and allow using 
  //          the N element of the storage instead of only N-1 (checking pointers is unnecessary)
  // 
  // multiple producers or consumers or both: if is needed to have multiple producers or multiple consumers or both, then
  //          a mutex for write and a mutex for read would allow that. Having a unique mutex for both operation is quite 
  //          inefficient, it would block in many unnecessary situations, for example the queue is almost empty but a producer
  //          pushing one elemnt blocks the queue for any reader.
  //
  // (*) disclaimer: it was difficult for me to find good information about the subject in internet, as an example I had to change
  //          the code of https://en.wikipedia.org/wiki/Circular_buffer#Circular_buffer_mechanics (so modification
  //          at 10:30 on 13 May 2023 Elxala, comes from me, that is same source as this code, be aware!)
  //          Also, as curiosity, a discussion with chatGPT about this queue provide wrong answers from chatGPT (not the version ChatGPT4).
  //          So, with that said, do not take the code and the notes as the truth but just as my strong opinion.
  //          If any of these notes requires correction, feedback would be appreciated.
  //
  template<typename T, int const N>
  class TNQueue
  {
  protected:
      
      int storage [N];    // Note: N-1 is the actual capacity, see isFull method
      int writeIndx;
      int readIndx;

  public:
     
     TNQueue ():
        writeIndx (0),
        readIndx (0)
     {
     }
     
     bool isFull ()
     {
        return (writeIndx + 1) % N == readIndx;
     }

     bool isEmpty ()
     {
        return readIndx == writeIndx;
     }

     bool push (T item) 
     {
        if (isFull ()) return false;
        
        storage[writeIndx] = item;
        writeIndx = (writeIndx + 1) % N;
        return true;
     }

     bool pop (T & item)
     {
        if (isEmpty ()) return false;
        item = storage[readIndx];
        readIndx = (readIndx + 1) % N;
        return true;
     }
  };

  int main ()
  {
    TNQueue<int, 15> cua;
    
    // basic test
    //
    int value = 1001;
    while (cua.push (value ++));
    while (cua.pop (value))
       printf ("read %d\n", value);
    return 0;
  }

相关问题