为什么我的pthread_cond_signal不能立即唤醒阻塞的线程?

2q5ifsrm  于 2023-03-07  发布在  其他
关注(0)|答案(2)|浏览(362)

我正在编写一个多线程程序,其中一个辅助线程在满足某个条件(即数据结构中存在一定数量的元素)后运行。

void*
gc_thread_handler(void *arg) {
    while (!done) {
        pthread_mutex_lock(&(gc.mutex));
        while (!gc.collect && !done) {
            pthread_cond_wait(&(gc.cond), &(gc.mutex));
        }
        pthread_mutex_unlock(&(gc.mutex));
        rcgc_activate();
    }

    return NULL;
}

我在等待线程上阻塞,直到它接收到由下面的函数发送的信号。

void *
gc_alloc(size_t sz) {
    pthread_mutex_lock(&(gc.mutex));
    // ...
    gc.num_chunks++;
    if (gc.num_chunks > MAX_CHUNKS) {
        gc.collect = true;
        pthread_cond_signal(&(gc.cond));
    }
    pthread_mutex_unlock(&(gc.mutex));
    return ptr;
}

但是,由于某种原因,似乎该信号没有立即唤醒休眠线程;信号被重复地发出(因为gc_alloc被重复地调用),最终,等待线程 * 确实 * 醒来并调用gc_activate,但我真的不明白为什么它不立即醒来。
先生:
气相色谱法:

#ifndef RCGC_H
#define RCGC_H

#define RCGC_OWALLOC(ptr, sz) (rcgc_owalloc((void **) (ptr), (sz)));
#define RCGC_ASSIGN(dest, src) (rcgc_assign((void **) (dest), (void **) (src)));

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

struct rcgc_allocation {
    void                   *data;
    struct rcgc_allocation *next;
    size_t                  counter;
    bool                    is_free;
};

// Reference counted gc.
struct rcgc {
    struct rcgc_allocation *head;
    struct rcgc_allocation *tail;

    int num_chunks;
    bool collect;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
};

void rcgc_init(void);
void *rcgc_alloc(size_t sz);
void *rcgc_owalloc(void **dest, size_t sz);
void rcgc_assign(void **dest, void **src);
void rcgc_activate(void);

#endif // RCGC_H

gc.c

#include "rcgc.h"

#define MAX_CHUNKS 1000

struct rcgc gc;

static struct rcgc_allocation *rcgc_search(void *ptr);

void
rcgc_init(void) {
    pthread_mutex_init(&(gc.mutex), NULL);
    pthread_cond_init(&(gc.cond), NULL);
    gc.head = gc.tail = NULL;
    gc.num_chunks     = 0;
    gc.collect        = false;
}

void *
rcgc_alloc(size_t sz) {
    pthread_mutex_lock(&(gc.mutex));
    struct rcgc_allocation *node = calloc(1, sizeof(struct rcgc_allocation));
    // Add to linked list.
    if (gc.head == NULL) {
        gc.head = gc.tail = node;
        gc.head->next = gc.tail->next = NULL;
    } else {
        gc.tail->next = node;
        gc.tail       = node;
    }

    // Now assign the ptr data.
    void *ptr     = calloc(1, sz);
    node->data    = ptr;
    node->is_free = false;
    node->counter = 1;

    // If we allocate more than enough chunks, send the signal.
    gc.num_chunks++;
    if (gc.num_chunks > MAX_CHUNKS && !gc.collect) {
        gc.collect = true;
        pthread_cond_signal(&(gc.cond));
    }

    pthread_mutex_unlock(&(gc.mutex));
    return ptr;
}

void *
rcgc_owalloc(void **dest, size_t sz) {
    pthread_mutex_lock(&(gc.mutex));
    // Determine where it exists in the tree.
    if (dest == NULL) {
        exit(EXIT_FAILURE);
    } else {
        struct rcgc_allocation *old_node = rcgc_search(*dest);
        if (old_node != NULL) {
            old_node->counter--;
        }
        pthread_mutex_unlock(&(gc.mutex));
        return rcgc_alloc(sz);
    }
}

void
rcgc_assign(void **dest, void **src) {
    pthread_mutex_lock(&(gc.mutex));

    // If "dest" is already in the tree, find it and decrement its ref ctr.
    if (dest != NULL) {
        struct rcgc_allocation *dest_alloc = rcgc_search(*dest);
        if (dest_alloc != NULL) {
            dest_alloc->counter--;
        }
    }

    // If we cannot find the source, then it does not exist.
    struct rcgc_allocation *src_alloc = rcgc_search(*src);
    src_alloc->counter++;
    *dest = *src;
    pthread_mutex_unlock(&(gc.mutex));
}

void
rcgc_activate(void) {
    pthread_mutex_lock(&(gc.mutex));
    for (struct rcgc_allocation *curr = gc.head; curr != NULL; curr = curr->next) {
        if (curr->counter == 0 && !curr->is_free) {
            free(curr->data);
            curr->data    = NULL;
            curr->is_free = true;
            gc.num_chunks--;
        }
    }
    gc.collect = false;
    pthread_mutex_unlock(&(gc.mutex));
}

static struct rcgc_allocation *
rcgc_search(void *ptr) {
    for (struct rcgc_allocation *curr = gc.head; curr != NULL; curr = curr->next) {
        if (ptr == curr->data) {
            return curr;
        }
    }
    return NULL;
}

main.c

#include <stdio.h>
#include <stdlib.h>

#include "rcgc.h"

extern struct rcgc gc;

static bool done = false;

void*
rcgc_thread_handler(void *arg) {
    while (!done) {
        pthread_mutex_lock(&(gc.mutex));
        while (!gc.collect && !done) {
            pthread_cond_wait(&(gc.cond), &(gc.mutex));
        }
        pthread_mutex_unlock(&(gc.mutex));
        rcgc_activate();
    }
    return NULL;
}

int
main(void) {
    rcgc_init();
    pthread_t pid;
    pthread_create(&pid, NULL, rcgc_thread_handler, NULL);

    // Create a few arbitrary allocations and assignments.
    int *arr1 = rcgc_alloc(sizeof(int) * 10);
    int *arr2 = NULL;

    // Do something for a while...
    double val = 0;
    while (val < 1) {
        val += 0.0001;
        arr1 = RCGC_OWALLOC(&arr1, sizeof(int));
        RCGC_ASSIGN(&arr2, &arr1);
    }

    done = true;
    pthread_cond_broadcast(&(gc.cond));
    pthread_join(pid, NULL);
    return 0;
}
6kkfgxo0

6kkfgxo01#

您的代码存在以下几个问题:

  • main.c的静态变量done存在数据竞争。由于它不是原子的,原始线程和辅助线程都访问它,并且至少有一个访问是写操作,因此所有对它的访问必须同步。解决这个问题的最简单方法可能是将它设置为_Atomic,以消除同步的需要。
  • void *可以看作是一个泛型指针,但是void ** * 不是 * 一个泛型双指针,C语言中没有这样的东西,因此,您的RCGC_OWALLOCRCGC_ASSIGN宏为rgc_owalloc()rgc_assign()的未定义行为做好了准备。大多数非算术原因的强制转换在语义上是不正确的。C语言在大多数情况下都提供自动转换,在这些情况下转换是明智的。
  • rcgc_thread_handler()在调用rcgc_activate()之前释放了互斥锁,但是rcgc_activate()立即尝试重新获取它。在许多系统的默认调度策略下,它很可能立即成功,但是并不一定会这样做。另一个线程可能会抢先获取互斥锁。次数不限。
  • 类似地,rcgc_owalloc()在调用rgc_alloc()之前立即释放互斥锁,rgc_alloc()立即尝试重新获取互斥锁,这同样为其他线程进入并运行任意时间留下了漏洞。
  • 主线程在开始向CV发送信号之前并不确保GC线程已经到达其CV等待。向没有等待者的CV发送信号是可以的,但是这样的信号不起任何作用。这是GC线程比主线程向CV发送信号运行rcgc_activate()次更少的合理解释。
  • 主线程几乎花费所有时间来保持互斥锁锁定,特别是,每次释放互斥锁时,它都会很快尝试再次获取互斥锁。正如another answer所解释的,对于大多数系统的默认调度策略,如果线程尝试在释放互斥锁后足够快地重新获取互斥锁,它很有可能在任何其他试图获取它的线程之前获取它。这也是GC线程运行rcgc_activate()的次数比主线程向CV发送信号的次数少的合理解释。
ndasle7k

ndasle7k2#

我真正不明白的是,为什么线程A一次又一次地重新获取锁,而线程B甚至没有机会调用activate()
在大多数工作站/服务器/移动设备操作系统的默认调度策略下,这是正常的行为。这些系统的设计目标是在多个不同的应用程序争用资源时最大化平台CPU的利用率。在此假设下,调度程序最小化上下文切换的数量是有意义的。多余的上下文切换会占用应用程序本来可以占用的CPU时间。
当出现互斥锁竞争时,调度程序倾向于将互斥锁授予时间片尚未到期的线程(即已经在CPU上运行的线程),而不是过早地将该线程(或任何其他线程)踢出CPU,以便唤醒其他等待锁的线程。
你所看到的结果有一个名字,叫做"饥饿"。避免饥饿的最好方法是最小化对锁的争用。重新设计程序,这样当任何线程想要获取互斥锁时,它就很少需要真正等待。确保没有线程拥有互斥锁的时间超过它更新几个变量所需的时间。
另一方面:如果您将平台专用于单个应用程序,那么选择一个备用的调度策略或备用的"实时"操作系统可能是有意义的,这将使您能够更好地控制线程的调度方式。

相关问题