c++ 如何正确消除代码中的竞态条件?

bnlyeluc  于 2023-06-25  发布在  其他
关注(0)|答案(1)|浏览(132)

有一段代码。

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <random>
#include <fstream>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include "unistd.h"

//#define PREPARE_FILE_TO_READ

using std::cout;
using std::endl;
using std::mutex;
using std::string;
using std::ifstream;
using std::ofstream;
using std::vector;
using std::shared_ptr;
using std::make_shared;
using std::mutex;
using std::atomic;
using std::lock_guard;
using std::unique_lock;
using std::thread;
using std::queue;
using std::condition_variable;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;

constexpr int NUM_THREADS = 4;
const int N = 100;
int NUMBER_OF_BYTES_IN_FILE = 0;
atomic<int> NUMBER_OF_BYTES_READ(0);
atomic<int> NUMBER_OF_BYTES_WRITTEN(0);
mutex mut;

template <typename T>
class TSQueue{
  private:
    queue<T> m_queue;
    mutable mutex m_mutex;
    condition_variable m_cond;
  public:
    void push(T item){
      unique_lock<mutex> lock(m_mutex);
      m_queue.push(item);
      m_cond.notify_one();
      cout << "TSQueue::push()" << endl;
    }
    T pop(){
      unique_lock<mutex> lock(m_mutex);
      cout << "TSQueue::pop() A size=" << m_queue.size() << endl;
      m_cond.wait(lock,[this](){return !m_queue.empty();});
      T item = m_queue.front();
      m_queue.pop();
      cout << "TSQueue::pop() B" << " item=" << item << endl;
      return item;
    }
    int size() const {
      unique_lock<mutex> lock(m_mutex);
      int size = m_queue.size();
      return size;
    }
};

shared_ptr<TSQueue<char>> squeue = make_shared<TSQueue<char>>();

class SyncFile{
  public:
    SyncFile(const char *  _path):path(_path){
      out.open(path, std::ios::out);
    }
    void write(char ch){
      lock_guard<mutex> lock(m);
      out.put(ch);
      out.put('\n');
    }
    ~SyncFile(){
      out.close();
    }
  private:
    const string path;
    ofstream out;
    mutable mutex m;
};

shared_ptr<SyncFile> ofA = make_shared<SyncFile>("ofA.dat");
shared_ptr<SyncFile> ofB = make_shared<SyncFile>("ofB.dat");

bool is_last_char(){
  return (NUMBER_OF_BYTES_WRITTEN == NUMBER_OF_BYTES_IN_FILE);
}

void get_data(){
  cout << "get_data THREAD ID=" << std::this_thread::get_id() << endl;
  ifstream in1("v1.dat");
  cout << "file size=" << NUMBER_OF_BYTES_IN_FILE << " B" << endl;
  NUMBER_OF_BYTES_READ = 0;
  int count = 0;
  while(NUMBER_OF_BYTES_READ < NUMBER_OF_BYTES_IN_FILE){
    char ch = in1.get();
    squeue->push(ch);
    NUMBER_OF_BYTES_READ++;
    count++;
    if(NUMBER_OF_BYTES_READ % 100 == 0)
      cout << "count=" << count << " NREAD=" << NUMBER_OF_BYTES_READ
       << " FILE_SIZE=" << NUMBER_OF_BYTES_IN_FILE
       << " squeue.size()=" << squeue->size()
       << endl;
  }
} 

void process_data(){
  this_thread::sleep_for(milliseconds(100));
  while(true){
    ///lock_guard<mutex> lock(mut);
    if(squeue->size() > 0){
      char ch = squeue->pop();
      if(ch == 'A') ofA->write(ch);
      if(ch == 'B') ofB->write(ch);
      NUMBER_OF_BYTES_WRITTEN++;
    }
    if(is_last_char()) break;
  }
}

int main()
{
#ifdef PREPARE_FILE_TO_READ
  vector<char> v1(N, 'a');
  for(int i = 0; i < v1.size(); ++i) v1[i] = 'A'+rand()%2;
  ofstream out1("v1.dat");
  out1.write((const char*) v1.data(), sizeof(char)*N);
  out1.close();
  exit(0);
#endif
  ifstream in1("v1.dat");
  in1.seekg(0, in1.end);
  NUMBER_OF_BYTES_IN_FILE = in1.tellg();
  cout << "NUMBER_OF_BYTES_IN_FILE="<<NUMBER_OF_BYTES_IN_FILE << endl;
  in1.seekg(0, in1.beg);
  vector<char> vin1(N, '0');
  in1.read((char*) vin1.data(), sizeof(char)*N);
  int countA=0, countB=0;
  for(int i = 0; i < vin1.size(); ++i){
    if(vin1[i] == 'A')      countA++;
    else if(vin1[i] == 'B') countB++;
  }
  in1.close();
  vector<thread> tasks;
  for(int i = 0; i < NUM_THREADS+1; ++i){
    if(i == 0) tasks.push_back(thread(get_data));
    else       tasks.push_back(thread(process_data));
  }
  for(int i = 0; i < tasks.size(); ++i) tasks[i].join();
  cout << "countA=" << countA << " countB=" << countB << endl;
  return 0;
}

首先,打开(取消注解)PREPARE_FILE_TO_READ标志,编译并运行迷你应用程序,并创建输出文件 v1.dat,该文件由'A'和'B' N char符号组成。然后这个标志被关闭(注解),当应用程序运行时,它挂起,最常见的是输出的最后一行:
TSQueue::pop()大小=0
我假设在 process_data() 函数中存在数据竞争:当线程A和线程B看到squeue.size()>0时

f(squeue->size() > 0)

然后进入if语句代码块,紧接着,比如说,线程A弹出最后一个元素,线程B稍后尝试弹出,但是队列现在是空的。这就是为什么线程B会因为条件变量 m_cond 而等待queue->size()> 0的时刻(但它永远不会发生,这就是为什么程序会挂起)。这是我对程序冻结原因的理解(如果我错了,请纠正我)。如果取消评论

lock_guard<mutex> lock(mut);

process_data() 中,一切似乎都正常工作,但没有多线程(据我所知)。
您能告诉我如何正确地消除这种竞争条件并解决这个问题吗?

uujelgoq

uujelgoq1#

我不确定这是唯一的问题,但现在您的TSQleue并不是真正的线程安全的。或者至少实际上没有办法使用它,这是免费的种族条件无论如何。
你的代码反映了这一点。现在,您的process_data具有:

if(squeue->size() > 0){
  char ch = squeue->pop();

考虑一下当您到达队列中的最后一个项目时会发生什么,其中有两个线程处理数据。两个线程都执行if语句,并且都看到队列中至少还有一个条目。然后他们两个都试图弹出那个项目。一个拿到东西,另一个永远等待。
事实上,线程安全队列通常不应该有size()成员函数(至少不是公共成员函数)。它的任何使用几乎是一个保证比赛条件。不,将其更改为is_emptyis_not_empty根本没有帮助。
相反,线程安全队列的接口通常应该只包括pushpop。但是pop实际上更像是try_pop-它可能会失败。
关于它将失败的确切情况,有几个选择。一个是如果队列为空,它会立即失败。另一个原因是它可以等待最多的某个时间段,以便队列中有一个项目,然后如果在这段时间内没有任何项目出现,它就会失败。
至少就我个人而言,我通常认为后者更有用。它可以通过传递超时值0来模拟前者。
condition_variable有一个wait_for成员函数,可以等待条件变为真,或者在指定的时间段后超时,这使得实现该行为变得容易。有了它,我们的pop可以看起来像这样:

bool pop(T& t, std::chrono::milliseconds const& dur = 200ms) {

    std::unique_lock<std::mutex> lock(locker_);

    if (!data_ready.wait_for(lock, dur, [data_&] { return !data_.empty(); })) {
        return false;
    }

    t = data_.front();
    data_.pop_front();
    return true;
}

使用这个,process_data将变得更像这样:

char ch;
if (!squeue.pop(ch))
    break;
    
// process ch

// I've left this here, but it's *probably* no longer needed.
if (is_last_char())
    break;

效率

尽管它与竞争条件无关,但使用多线程一次处理单个字符的数据(并对每个字符进行最小化处理)几乎保证最终会导致净损失。线程之间通信的开销实际上保证比多线程所能获得的任何好处都要大。

摘要

线程安全队列 * 通常 * 不应该有任何方法让外部代码甚至试图检索其大小或询问其是否为空。任何if (!queue.empty())if (queue.size() > 0)或任何类似形式的代码几乎保证会导致竞争条件。我所看到的唯一有用的用途是记录队列大小,以便在(例如)处理数据的速度不如产生的速度时,给予您了解队列在不断增长的情况下。但是如果你在其中建立了任何条件,那么当你试图对其采取行动时,这个条件可能已经改变了(这几乎就是竞争条件的定义)。

相关问题