我是线程的新手,所以我使用这篇文章来学习如何创建线程池。https://nachtimwald.com/2019/04/12/thread-pool-in-c/
问题是,当我使用大量任务时,一切都运行良好,但如果任务数量较少,那么任务处理函数就不会被调用。void worker(void *arg)
例如,如果线程的数量是4,而我创建了24个任务,那么它们几乎永远不会被执行。(见图片)。任务的数量越少,函数工作的概率就越小。void worker(void *arg)
我不确定,但在我看来,主要问题是没有时间创建线程,因为任务已经在Queue中。使用pthread_create()函数创建线程
tpool.h
#ifndef __TPOOL_H__
#define __TPOOL_H__
#include <stdbool.h>
#include <stddef.h>
struct tpool;
typedef struct tpool tpool_t;
typedef void (*thread_func_t)(void *arg);
tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);
#endif /* __TPOOL_H__ */
tpool.c
#include <pthread.h>
#include "tpool.h"
struct tpool_work {
thread_func_t func;
void *arg;
struct tpool_work *next;
};
typedef struct tpool_work tpool_work_t;
struct tpool {
tpool_work_t *work_first;
tpool_work_t *work_last;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_cnt;
size_t thread_cnt;
bool stop;
};
static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
{
tpool_work_t *work;
if (func == NULL)
return NULL;
work = malloc(sizeof(*work));
work->func = func;
work->arg = arg;
work->next = NULL;
return work;
}
static void tpool_work_destroy(tpool_work_t *work)
{
if (work == NULL)
return;
free(work);
}
static tpool_work_t *tpool_work_get(tpool_t *tm)
{
tpool_work_t *work;
if (tm == NULL)
return NULL;
work = tm->work_first;
if (work == NULL)
return NULL;
if (work->next == NULL) {
tm->work_first = NULL;
tm->work_last = NULL;
} else {
tm->work_first = work->next;
}
return work;
}
static void *tpool_worker(void *arg)
{
tpool_t *tm = arg;
tpool_work_t *work;
while (1) {
pthread_mutex_lock(&(tm->work_mutex));
while (tm->work_first == NULL && !tm->stop)
pthread_cond_wait(&(tm->work_cond), &(tm->work_mutex));
if (tm->stop)
break;
work = tpool_work_get(tm);
tm->working_cnt++;
pthread_mutex_unlock(&(tm->work_mutex));
if (work != NULL) {
work->func(work->arg);
tpool_work_destroy(work);
}
pthread_mutex_lock(&(tm->work_mutex));
tm->working_cnt--;
if (!tm->stop && tm->working_cnt == 0 && tm->work_first == NULL)
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
}
tm->thread_cnt--;
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return NULL;
}
tpool_t *tpool_create(size_t num)
{
tpool_t *tm;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
tm = calloc(1, sizeof(*tm));
tm->thread_cnt = num;
pthread_mutex_init(&(tm->work_mutex), NULL);
pthread_cond_init(&(tm->work_cond), NULL);
pthread_cond_init(&(tm->working_cond), NULL);
tm->work_first = NULL;
tm->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, tpool_worker, tm);
pthread_detach(thread);
}
return tm;
}
void tpool_destroy(tpool_t *tm)
{
tpool_work_t *work;
tpool_work_t *work2;
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->work_mutex));
work = tm->work_first;
while (work != NULL) {
work2 = work->next;
tpool_work_destroy(work);
work = work2;
}
tm->stop = true;
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
tpool_wait(tm);
pthread_mutex_destroy(&(tm->work_mutex));
pthread_cond_destroy(&(tm->work_cond));
pthread_cond_destroy(&(tm->working_cond));
free(tm);
}
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg)
{
tpool_work_t *work;
if (tm == NULL)
return false;
work = tpool_work_create(func, arg);
if (work == NULL)
return false;
pthread_mutex_lock(&(tm->work_mutex));
if (tm->work_first == NULL) {
tm->work_first = work;
tm->work_last = tm->work_first;
} else {
tm->work_last->next = work;
tm->work_last = work;
}
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return true;
}
void tpool_wait(tpool_t *tm)
{
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->work_mutex));
while (1) {
if ((!tm->stop && tm->working_cnt != 0) || (tm->stop && tm->thread_cnt != 0)) {
pthread_cond_wait(&(tm->working_cond), &(tm->work_mutex));
} else {
break;
}
}
pthread_mutex_unlock(&(tm->work_mutex));
}
main.c
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "tpool.h"
static const size_t num_threads = 4;
static const size_t num_items = 24;
void worker(void *arg)
{
int *val = arg;
int old = *val;
*val += 1000;
printf("tid=%p, old=%d, val=%d\n", pthread_self(), old, *val);
if (*val%2)
usleep(100000);
}
int main(int argc, char **argv)
{
tpool_t *tm;
int *vals;
size_t i;
tm = tpool_create(num_threads);
vals = calloc(num_items, sizeof(*vals));
for (i=0; i<num_items; i++) {
vals[i] = i;
tpool_add_work(tm, worker, vals+i);
}
tpool_wait(tm);
/*for (i=0; i<num_items; i++) {
printf("%d\n", vals[i]);
}*/
free(vals);
tpool_destroy(tm);
return 0;
}
1条答案
按热度按时间ymdaylpp1#
tpool_worker()
中的tm->working_cnt++;
和tpool_wait()
中的测试tm->working_cnt!=0
之间存在线程调度竞争。要解决这个问题,working_cnt
应该在工作被添加到tpool_add_work()
中的池时递增,而不是在工作线程将工作从tpool_worker()
中的队列中移出时递增。通过将tm->working_cnt++;
线移动到tpool_add_work()
,可以在tm->work_mutex
的锁定和解锁之间的某个位置修复该竞赛。顺便说一下,tpool.c需要
#include <stdlib.h>
行来声明malloc()
、calloc()
和free()
。