在C++中在工作线程之间拆分数据集计算

bogh5gae  于 2023-03-09  发布在  其他
关注(0)|答案(2)|浏览(105)

我有一个1500行的2D矢量数据集,我想对每一行执行一个计算量很大的操作。我想利用多个线程来完成这个操作,以便尽可能快地执行。
我在互联网上找不到任何明确的解决方案来解决这个问题。我想为每行创建一个线程,但这似乎非常低效,因为1500个线程将占用大量不必要的资源。因此,最好创建6个工作线程(我的PC有6个内核)并拆分工作。

//Computationally expensive function
double Loss(vector<double> input,vector<double> expectedOutput);

//This Calculates the Loss of all the 1500 rows in the 2D Vector and returns their average
double TotalLoss(vector<vector<double>> inputs,vector<vector<double>> expectedOutputs);

//TBD: Implement TotalLoss using multipel threads
double MultiThreadedTotalLoss(vector<vector<double>> inputs,vector<vector<double>> expectedOutputs);

我试着使用std::thread来实现不同的线程,但不知道如何将工作分成6个。我想过制作6个不同的向量,并划分原始向量,但不知道这是否是最好的方法。
任何帮助都将不胜感激。

bq8i3lrv

bq8i3lrv1#

假设您已经有了一个函数来计算一行的损失,我将猜测您的TotalLoss函数看起来像这样:

using Matrix = std::vector<std::vector<double>>;

double TotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

假设这是相当准确的,多线程版本可能看起来像这样:

using Matrix = std::vector<std::vector<double>>;

double MultithreadedTotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    #pragma omp parallel for reduction(+:ret)
    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

多写一行#pragma,然后就可以开始了。就此而言,即使是其中的reduction(+:ret)部分也是可选的,尽管它可以在一定程度上提高效率(它基本上告诉编译器/库保留一个每个线程的累加器,然后在最后将它们相加,而不是让线程在运行时争夺对单个变量的访问)。如果loss()确实很昂贵,这可能不会有太大的不同。
与显式地进行线程化相比,这有一些优点。最明显的一个优点是,与显式地编写所有线程化代码相比,这相当简单。不太明显但通常几乎同样重要的是,它可以/将自动找到(并使用)可用的内核数量,因此它将使用您拥有的6个内核,但如果您在一台拥有128个内核的计算机上运行它,从长远来看,它还有一个优势,那就是继续保持所涉及的基本算法易于查找、易于阅读等。显式多线程可以很快地被线程管理“东西”所主导,所以几乎很难找到真实的工作的代码。
最大的缺点是缺乏灵活性。对于这样的代码,OpenMP可以工作得很好--但是对于其他一些情况,它就很难应用了。

hwamh0ep

hwamh0ep2#

你可以使用下面的ThreadPool例子。它并不完美,但应该给予你一个想法:-)

#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>

using namespace std::chrono_literals;

class ThreadPool
{
public:
    ThreadPool(int no_of_threads) : m_pool(no_of_threads)
    {
        for (int i = 0; i < no_of_threads; i++)
        {
            m_pool[i] = std::thread(&ThreadPool::thread_func, this, i);
            m_pool[i].detach();
        }
    }

    void add_task(std::function<void(int)> task_fn)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        m_task_queue.push(task_fn);
        m_cv.notify_all();
    }

    void stop_processing()
    {
        m_stop_all_threads = true;
    }
private:
    void thread_func(int thread_id)
    {
        while (!m_stop_all_threads)
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            // wait for some task to be added in queue
            if (!m_cv.wait_for(lck, 100us, [this]() { return !m_task_queue.empty(); }))
                continue;
            // pick up task, update queue
            auto fn = m_task_queue.front();
            m_task_queue.pop();
            lck.unlock();
            // execute task
            fn(thread_id);
            // std::this_thread::sleep_for(1us);
        }
    }

    std::vector<std::thread> m_pool;
    std::atomic<bool> m_stop_all_threads{false};
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::queue< std::function<void(int)> > m_task_queue;
};

int main()
{    
    ThreadPool pool(5);

    int i = 0;
    while (i < 100)
    {
        pool.add_task([x = i](int id) { std::cout << "This is task " << x << " in thread " << id << '\n'; });
        i++;
    }
    std::this_thread::sleep_for(5ms);
    pool.stop_processing();
}

当前,m_task_queue需要std::function<void(int)>。您可以将其更改为所需的任何签名。

相关问题