WinAPI c++多线程计算

68bkxrlz  于 2022-12-15  发布在  其他
关注(0)|答案(2)|浏览(148)

我有一个任务是用以下公式计算Pi:
第一节第一节第一节第一节第一次
(i的范围为0至N,N = 10^8)
计算应在多个线程中完成,并满足以下要求:每个线程只接收一个小的固定数量的计算来完成(在我的情况下-一次40个成员),并且应该有一个“任务池”,当线程报告完成了之前给它的一组操作时,它给线程一组新的计算。2在线程接收新的任务之前,它应该等待。3所有这些都应该用WinAPI来完成。
我的解决方案是这个类:

#include "ThreadManager.h"
#include <string>

HANDLE ThreadManager::mutex = (CreateMutexA(nullptr, true, "m"));

ThreadManager::ThreadManager(size_t threadCount)
{
    threads.reserve(threadCount);
    for (int i = 0; i < threadCount; i++)
    {
        threadInfo.push_back(new ThreadStruct(i * OP_COUNT));
        HANDLE event = CreateEventA(nullptr, false, true, std::to_string(i).c_str());

        if (event)
        {
            threadEvents.push_back(event);
            DuplicateHandle(GetCurrentProcess(), event, GetCurrentProcess(),
                            &(threadInfo[i]->threadEvent), 0, false, DUPLICATE_SAME_ACCESS);
        }
        else std::cout << "Unknown error: " << GetLastError() << std::endl;
        HANDLE thread = CreateThread(nullptr, 0,
                                     reinterpret_cast<LPTHREAD_START_ROUTINE>(&ThreadManager::threadFunc),
                                     threadInfo[i],
                                     CREATE_SUSPENDED, nullptr);
        if (thread) threads.push_back(thread);
        else std::cout << "Unknown error: " << GetLastError() << std::endl;
    }
}

double ThreadManager::run()
{
    size_t operations_done = threads.size() * OP_COUNT;
    for (HANDLE t : threads) ResumeThread(t);
    DWORD index;
    Sleep(10);
    while (operations_done < ThreadManager::N)
    {
        ReleaseMutex(ThreadManager::mutex);
        index = WaitForMultipleObjects(this->threadEvents.size(), this->threadEvents.data(), false, 10000);
        WaitForSingleObject(ThreadManager::mutex, 1000);
        threadInfo[index] -> operationIndex = operations_done + OP_COUNT;
        SetEvent(threadEvents[index]);
        //std::cout << "Operations completed: " << operations_done << "/1000" << std::endl;
        operations_done += OP_COUNT;
    }
    long double res_pi = 0;
    for (auto&& ts: this->threadInfo)
    {
        res_pi += ts->pi;
        ts->operationIndex = N;
    }
    res_pi /= N;
    WaitForMultipleObjects(this->threads.size(), this->threads.data(), true, 10000);
    std::cout.precision(10);
    std::cout << "Pi value for " << threads.size() << " threads: " << res_pi;
    threads.clear();
    return 0;
}

ThreadManager::~ThreadManager()
{
    if (!threads.empty())
        for (HANDLE t: threads)
        {
            TerminateThread(t, -1);
            CloseHandle(t);
        }
    std::destroy(threadInfo.begin(), threadInfo.end());
}

long double ThreadManager::calc(size_t startIndex)
{
    long double xi = 0;
    long double pi = 0;
    for (size_t i = startIndex; i < startIndex + OP_COUNT; i++)
    {
        const long double ld_i = i;
        const long double half = 0.5f;
        xi = (ld_i + half) * (1.0 / N);
        pi += ((4.0 / (1.0 + xi * xi)));
    }
    return pi;
}

DWORD WINAPI ThreadManager::threadFunc(ThreadStruct *ts)
{
    while (ts->operationIndex < N)
    {
        WaitForSingleObject(ts->threadEvent, 1000);
        ts->pi += calc(ts->operationIndex);
        WaitForSingleObject(ThreadManager::mutex, 1000);
        SetEvent(ts->threadEvent);
        ReleaseMutex(ThreadManager::mutex);
    }
    return 0;
}

ThreadStruct::ThreadStruct(size_t opIndex)
{
    this -> pi = 0;
    this -> operationIndex = opIndex;
}

我的想法是每个线程都有一个自动重置事件,当线程完成计算时,该事件被设置为发出信号。主线程等待其中一个线程事件发出信号,并在修改共享ThreadStruct中的一些值后(以使线程能够启动另一部分计算)它将同一事件设置为用信号通知,它被同一个线程和进程接收,但这对一个线程都不起作用:因此,我看到的值非常随机,不接近Pi(如0.0001776328265)。
虽然我的GDB调试器工作得很差(不显示一些变量,有时甚至崩溃),但我注意到发生了太多的计算(我将N缩小到1000。因此,我应该看到线程打印出“计算”1000/40 = 25次,但实际上它发生了数百次)
然后我试着添加一个互斥锁,这样线程就可以等到主线程不忙碌时才发出事件信号。这使得计算速度变慢,而且仍然不准确和随机(例如:50.26492171(如果是16个螺纹)。
可能是什么问题?或者,如果它是完全错误的,我该如何组织多线程计算呢?创建一个类是一个坏主意吗?
如果你想重现这个问题,这里是头文件的内容(我使用的是c++20,MinGW 6.0):

#ifndef MULTITHREADPI_THREADMANAGER_H
#define MULTITHREADPI_THREADMANAGER_H
#include <iostream>
#include <vector>
#include <list>
#include <windows.h>
#include <memory>

struct ThreadStruct
{
    size_t operationIndex;
    long double pi;
    HANDLE threadEvent = nullptr;
    explicit ThreadStruct(size_t opIndex);
};

class ThreadManager
{
public:
    explicit ThreadManager(size_t threadCount);
    double run();
    ~ThreadManager();

private:
    std::vector<ThreadStruct*> threadInfo;
    std::vector<HANDLE> threads;
    std::vector<HANDLE> threadEvents;
    static HANDLE mutex;
    static long double calc(size_t startIndex);
    static const int OP_COUNT = 40;
    static const int N = 100000000;
    static DWORD WINAPI threadFunc(ThreadStruct* ts);
};

#endif //MULTITHREADPI_THREADMANAGER_H

要执行代码,只需构造ThreadManager,并将所需的线程数作为参数,然后对其调用run()。

fxnxkyjh

fxnxkyjh1#

即使下面所有的都改变了,它也没有给予接近PI的一致的值。一定还有更多的东西要修复。我认为这与事件有关。如果我理解正确的话,互斥锁保护的是两个不同的东西。而事件也用于两个不同的东西。所以在执行过程中它们的含义都改变了。这使得很难考虑清楚。

1.超时

WaitForMultipleObjects可能会超时。在这种情况下,它返回WAIT_TIMEOUT,定义为0x102或258。您可以使用该值访问threadInfo向量,而不进行边界检查。您可以将at(n)用于[n]的边界检查版本。
在调试或将OP_COUNT设置为较高的数值时,您很容易遇到10秒的超时,因此,您可能希望将其设置为INFINITE
这会导致各种各样的不当行为:
1.线程信息(operationIndex)在线程可能对其工作时被更新。

  1. operations_done被更新,尽管那些操作可能未被执行
    1.互斥锁可能已过度释放

2.限制线程数

线程管理器还应该检查线程数,因为不能将其设置为大于MAXIMUM_WAIT_OBJECTS的数字,否则WaitForMultipleObjects()将无法可靠地工作。

3.关闭1个错误

应该是
size_t operations_done = (threads.size()-1) * OP_COUNT;

threadInfo[index] -> operationIndex = operations_done; // was + OP_COUNT
否则会跳过一批

4.结束线程

结束线程依赖于超时。
当您用INFINITE替换所有超时时,您会注意到您的线程永远不会结束。

6yjfywim

6yjfywim2#

struct CommonData;

struct ThreadData 
{
    CommonData* pData;
    ULONG i, k;

    ThreadData(CommonData* pData, ULONG i, ULONG k) : pData(pData), i(i), k(k) {}

    static ULONG CALLBACK Work(void* p);
};

struct CommonData
{
    HANDLE hEvent = 0;
    LONG dwActiveThreadCount = 1;
    ULONG N;
    union {
        double res = 0;
        __int64 i64;
    };

    CommonData(ULONG N) : N(N) {}

    ~CommonData()
    {
        if (HANDLE h = hEvent)
        {
            CloseHandle(h);
        }
    }

    void DecThread()
    {
        if (!InterlockedDecrement(&dwActiveThreadCount))
        {
            if (!SetEvent(hEvent)) __debugbreak();
        }
    }

    BOOL AddThread(ULONG i, ULONG k)
    {
        InterlockedIncrementNoFence(&dwActiveThreadCount);

        if (ThreadData* ptd = new ThreadData(this, i, k))
        {
            if (HANDLE hThread = CreateThread(0, 0, ThreadData::Work, ptd, 0, 0))
            {
                CloseHandle(hThread);

                return TRUE;
            }

            delete ptd;
        }

        DecThread();

        return FALSE;
    }

    BOOL Init()
    {
        return 0 != (hEvent = CreateEvent(0, 0, 0, 0));
    }

    void Wait()
    {
        DecThread();
        if (WaitForSingleObject(hEvent, INFINITE) != WAIT_OBJECT_0) __debugbreak();
    }
};

ULONG CALLBACK ThreadData::Work(void* p)
{
    CommonData* pData = reinterpret_cast<ThreadData*>(p)->pData;
    ULONG i = reinterpret_cast<ThreadData*>(p)->i;
    ULONG k = reinterpret_cast<ThreadData*>(p)->k;
    delete p;

    ULONG N = pData->N;
    double pi = 0;
    do 
    {
        double xi = (i++ + 0.5) / N;
        pi += 4 / (1 + xi * xi);
    } while (--k);

    union {
        double d;
        __int64 i64;
    };

    i64 = pData->i64;

    for (;;)
    {
        union {
            double d_compare;
            __int64 i64_compare;
        };

        i64_compare = i64;

        d += pi;

        if (i64_compare == (i64 = InterlockedCompareExchange64(
                           &pData->i64, i64, i64_compare)))
        {
            break;
        }
    }

    pData->DecThread();

    return 0;
}

double calc_pi(ULONG N)
{
    SYSTEM_INFO si;
    GetSystemInfo(&si);
    if (si.dwNumberOfProcessors)
    {
        CommonData cd(N);
        if (cd.Init())
        {
            ULONG k = (N + si.dwNumberOfProcessors - 1) / si.dwNumberOfProcessors, i = 0;
            do 
            {
                if (!cd.AddThread(i, k))
                {
                    break;
                }
            } while (i += k, --si.dwNumberOfProcessors);

            cd.Wait();

            if (!si.dwNumberOfProcessors)
            {
                return cd.res/ N;
            }
        }
    }

    return 0;
}

当我在8个内核上运行call calc_pi(100000000)时,我得到了3.1415926535898153

相关问题