我有一个任务是用以下公式计算Pi:
第一节第一节第一节第一节第一次
(i的范围为0至N,N = 10^8)
计算应在多个线程中完成,并满足以下要求:每个线程只接收一个小的固定数量的计算来完成(在我的情况下-一次40个成员),并且应该有一个“任务池”,当线程报告完成了之前给它的一组操作时,它给线程一组新的计算。2在线程接收新的任务之前,它应该等待。3所有这些都应该用WinAPI来完成。
我的解决方案是这个类:
#include "ThreadManager.h"
#include <string>
HANDLE ThreadManager::mutex = (CreateMutexA(nullptr, true, "m"));
ThreadManager::ThreadManager(size_t threadCount)
{
threads.reserve(threadCount);
for (int i = 0; i < threadCount; i++)
{
threadInfo.push_back(new ThreadStruct(i * OP_COUNT));
HANDLE event = CreateEventA(nullptr, false, true, std::to_string(i).c_str());
if (event)
{
threadEvents.push_back(event);
DuplicateHandle(GetCurrentProcess(), event, GetCurrentProcess(),
&(threadInfo[i]->threadEvent), 0, false, DUPLICATE_SAME_ACCESS);
}
else std::cout << "Unknown error: " << GetLastError() << std::endl;
HANDLE thread = CreateThread(nullptr, 0,
reinterpret_cast<LPTHREAD_START_ROUTINE>(&ThreadManager::threadFunc),
threadInfo[i],
CREATE_SUSPENDED, nullptr);
if (thread) threads.push_back(thread);
else std::cout << "Unknown error: " << GetLastError() << std::endl;
}
}
double ThreadManager::run()
{
size_t operations_done = threads.size() * OP_COUNT;
for (HANDLE t : threads) ResumeThread(t);
DWORD index;
Sleep(10);
while (operations_done < ThreadManager::N)
{
ReleaseMutex(ThreadManager::mutex);
index = WaitForMultipleObjects(this->threadEvents.size(), this->threadEvents.data(), false, 10000);
WaitForSingleObject(ThreadManager::mutex, 1000);
threadInfo[index] -> operationIndex = operations_done + OP_COUNT;
SetEvent(threadEvents[index]);
//std::cout << "Operations completed: " << operations_done << "/1000" << std::endl;
operations_done += OP_COUNT;
}
long double res_pi = 0;
for (auto&& ts: this->threadInfo)
{
res_pi += ts->pi;
ts->operationIndex = N;
}
res_pi /= N;
WaitForMultipleObjects(this->threads.size(), this->threads.data(), true, 10000);
std::cout.precision(10);
std::cout << "Pi value for " << threads.size() << " threads: " << res_pi;
threads.clear();
return 0;
}
ThreadManager::~ThreadManager()
{
if (!threads.empty())
for (HANDLE t: threads)
{
TerminateThread(t, -1);
CloseHandle(t);
}
std::destroy(threadInfo.begin(), threadInfo.end());
}
long double ThreadManager::calc(size_t startIndex)
{
long double xi = 0;
long double pi = 0;
for (size_t i = startIndex; i < startIndex + OP_COUNT; i++)
{
const long double ld_i = i;
const long double half = 0.5f;
xi = (ld_i + half) * (1.0 / N);
pi += ((4.0 / (1.0 + xi * xi)));
}
return pi;
}
DWORD WINAPI ThreadManager::threadFunc(ThreadStruct *ts)
{
while (ts->operationIndex < N)
{
WaitForSingleObject(ts->threadEvent, 1000);
ts->pi += calc(ts->operationIndex);
WaitForSingleObject(ThreadManager::mutex, 1000);
SetEvent(ts->threadEvent);
ReleaseMutex(ThreadManager::mutex);
}
return 0;
}
ThreadStruct::ThreadStruct(size_t opIndex)
{
this -> pi = 0;
this -> operationIndex = opIndex;
}
我的想法是每个线程都有一个自动重置事件,当线程完成计算时,该事件被设置为发出信号。主线程等待其中一个线程事件发出信号,并在修改共享ThreadStruct中的一些值后(以使线程能够启动另一部分计算)它将同一事件设置为用信号通知,它被同一个线程和进程接收,但这对一个线程都不起作用:因此,我看到的值非常随机,不接近Pi(如0.0001776328265)。
虽然我的GDB调试器工作得很差(不显示一些变量,有时甚至崩溃),但我注意到发生了太多的计算(我将N缩小到1000。因此,我应该看到线程打印出“计算”1000/40 = 25次,但实际上它发生了数百次)
然后我试着添加一个互斥锁,这样线程就可以等到主线程不忙碌时才发出事件信号。这使得计算速度变慢,而且仍然不准确和随机(例如:50.26492171(如果是16个螺纹)。
可能是什么问题?或者,如果它是完全错误的,我该如何组织多线程计算呢?创建一个类是一个坏主意吗?
如果你想重现这个问题,这里是头文件的内容(我使用的是c++20,MinGW 6.0):
#ifndef MULTITHREADPI_THREADMANAGER_H
#define MULTITHREADPI_THREADMANAGER_H
#include <iostream>
#include <vector>
#include <list>
#include <windows.h>
#include <memory>
struct ThreadStruct
{
size_t operationIndex;
long double pi;
HANDLE threadEvent = nullptr;
explicit ThreadStruct(size_t opIndex);
};
class ThreadManager
{
public:
explicit ThreadManager(size_t threadCount);
double run();
~ThreadManager();
private:
std::vector<ThreadStruct*> threadInfo;
std::vector<HANDLE> threads;
std::vector<HANDLE> threadEvents;
static HANDLE mutex;
static long double calc(size_t startIndex);
static const int OP_COUNT = 40;
static const int N = 100000000;
static DWORD WINAPI threadFunc(ThreadStruct* ts);
};
#endif //MULTITHREADPI_THREADMANAGER_H
要执行代码,只需构造ThreadManager
,并将所需的线程数作为参数,然后对其调用run()。
2条答案
按热度按时间fxnxkyjh1#
即使下面所有的都改变了,它也没有给予接近PI的一致的值。一定还有更多的东西要修复。我认为这与事件有关。如果我理解正确的话,互斥锁保护的是两个不同的东西。而事件也用于两个不同的东西。所以在执行过程中它们的含义都改变了。这使得很难考虑清楚。
1.超时
WaitForMultipleObjects可能会超时。在这种情况下,它返回
WAIT_TIMEOUT
,定义为0x102或258。您可以使用该值访问threadInfo
向量,而不进行边界检查。您可以将at(n)
用于[n]
的边界检查版本。在调试或将
OP_COUNT
设置为较高的数值时,您很容易遇到10秒的超时,因此,您可能希望将其设置为INFINITE
。这会导致各种各样的不当行为:
1.线程信息(
operationIndex
)在线程可能对其工作时被更新。operations_done
被更新,尽管那些操作可能未被执行1.互斥锁可能已过度释放
2.限制线程数
线程管理器还应该检查线程数,因为不能将其设置为大于MAXIMUM_WAIT_OBJECTS的数字,否则WaitForMultipleObjects()将无法可靠地工作。
3.关闭1个错误
应该是
size_t operations_done = (threads.size()-1) * OP_COUNT;
或
threadInfo[index] -> operationIndex = operations_done; // was + OP_COUNT
否则会跳过一批
4.结束线程
结束线程依赖于超时。
当您用
INFINITE
替换所有超时时,您会注意到您的线程永远不会结束。6yjfywim2#
当我在8个内核上运行
call calc_pi(100000000)
时,我得到了3.1415926535898153