c程序处理多线程

hjzp0vay  于 2022-12-03  发布在  其他
关注(0)|答案(1)|浏览(188)

我需要帮助我的c程序,我已经做了大部分,但它有一些问题。
该程序是关于***探索进程和线程之间的同步。***
在一个程序中为您提供了三(3)个进程,它们协同工作以解决生产者消费者问题:

  • 两个过程是“生产者”,每个过程在一个连续循环中生产自己类型的产品。也就是说,一个生产者生产一种产品类型,另一个生产者生产不同的产品类型。
  • 一个流程是产品的“消费者”,由五(5)条线索组成:
  • 1个线程是“分发服务器”线程
  • 两(2)个线程使用一种类型的产品(例如,仅使用产品类型1)
  • 两(2)个线程消耗第二类型的产品(例如,仅消耗产品类型2)。

消费者进程包含两个(2)产品存储缓冲区,每个缓冲区由固定数量的槽组成。缓冲区中的槽数量不同(一个比另一个具有更多的槽)。您选择并指定每个缓冲器中的槽的数量作为您的程序解决方案中的定义。生产者进程和消费者进程之间的通信将通过单个“管道”。共享管道用于在每个生产者进程和消费者进程之间进行通信。每个生产者进程将由消费者进程消费的产品写入该管道。
最终计划交付:完成产品消费线程、输出文件的设计和编写功能;使用输出文件运行示例数据
我已经完成了大部分,但它有一些问题。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <fcntl.h>
#include <pthread.h>

#define BUFFER_SIZE1 20
#define BUFFER_SIZE2 30

typedef struct
{
    int count;
    int productType;
} product;

int count = 0;
int fd[2];

pthread_mutex_t lock;
pthread_cond_t cond;

typedef struct
{
    product *values;
    int head;
    int tail;
    int numEntries;
    int size;
} queue;

queue q1;
queue q2;

void producer(int args);
void *consumer(void *args);
void *distributor(void *args);
void initQ(queue *q, int size);
bool QEmpty(queue *q);
bool QFull(queue *q);
bool put(queue *q, product prod);
product get(queue *q);

// https://www.youtube.com/watch?v=l6zkaJFjUbM
// https://www.geeksforgeeks.org/multithreading-c-2/
int main(int argc, char const *argv[])
{
    // Creating 5 threads 4 consumer and 1 distributor
    pthread_t th[5];
    // Creating our pipe, fd[0] is read end, fd[1] is write end
    if (pipe(fd) == -1)
    {
        perror("error creating pipe");
        exit(1);
    }

    // Initializing both buffers
    initQ(&q1, BUFFER_SIZE1);
    initQ(&q2, BUFFER_SIZE2);

    int pid1;
    int pid2;
    int consId1 = 1;
    int consId2 = 2;

    // Initializing lock
    pthread_mutex_init(&lock, NULL);

    // Initialziing condition variables
    pthread_cond_init(&cond, NULL);

    // Create first producer process using fork(), child process 1
    if (pid1 = fork() == 0)
    {
        producer(1);
    }
    // Create second prodcuer process using fork(), child process 2
    else if (pid2 = fork() == 0)
    {
        producer(2);
    }
    // Create distrubtor and consumer threads, parent process
    else
    {
        // Creating 4 threads using for loop and pthread_create
        for (int i = 0; i < 4; i++)
        {
            // 2 consumer threads for product type 1
            if (i == 1 || i == 2)
            {
                if (pthread_create(&th[i], NULL, &consumer, &consId1) != 0)
                {
                    perror("Error creating thread");
                }
            }
            // 2 consumer threads for product type 2
            else
            {
                if (pthread_create(&th[i], NULL, &consumer, &consId2) != 0)
                {
                    perror("Error creating thread");
                }
            }
        }
        // use pthread_join to wait for preivous thread to terminate
        for (int i = 0; i < 4; i++)
        {
            if (pthread_join(th[i], NULL) != 0)
            {
                perror("Error joining thread");
            }
        }
        // Distributor thread
        close(fd[1]);

        while (1)
        {
            product prod;

            // Using lock and condition variable around crit section to avoid race condition
            // pthread_mutex_lock(&lock);
            // pthread_cond_wait(&cond, &lock);
            // Read from the pipe
            read(fd[0], &prod, sizeof(prod));
            if (prod.productType == 1)
            {
                put(&q1, prod);
            }
            else
            {
                put(&q2, prod);
            }
        }
        // pthread_cond_signal(&cond);
        // pthread_mutex_unlock(&lock);
        // Close read end of the pipe
        close(fd[0]);
    }
    return 0;
}

// Creating the producers
void producer(int args)
{
    int prodCount = 0;
    product prod;
    prod.productType = args;

    // Close read end of the pipe
    close(fd[0]);

    while (1)
    {
        prodCount++;
        prod.count = prodCount;
        // Send product to the pipe so the consumer can use
        write(fd[1], &prod, sizeof(prod));
        // Sleep for 0.01 - 0.2 seconds after each loop
        int time = (rand() % (200000 - 10000 + 1)) + 10000;
        usleep(time);
    }

    // Close write end of the pipe
    close(fd[1]);
}

void *consumer(void *args)
{
    int consCount1;
    int consCount2;

    FILE *fp;
    fp = fopen("output.txt", "w");

    product prod;
    int prodType = *(int *)args;

    while (1)
    {
        if (prodType == 1)
        {
            get(&q1);
            consCount1++;
            fprintf("Thread ID: %d\n", prodType);
            fprintf(fp, "Product Type: %d\n", prod.productType);
            fprintf(fp, "Production Sequence #: %d\n", prod.count);
            fprintf(fp, "Consumption Sequence #: %d\n", consCount1);
        }
        else
        {
            get(&q2);
            consCount2++;
            fputs("Thread ID: 2\n", fp);
            fprintf(fp, "Product Type: %d\n", prod.productType);
            fprintf(fp, "Production Sequence #: %d\n", prod.count);
            fprintf(fp, "Consumption Sequence #: %d\n", consCount2);
        }
    }
    fclose(fp);
}

// https://www.youtube.com/watch?v=oyX30WVuEos&t=196s
// Circular buffer
void initQ(queue *q, int size)
{
    q->size = size;
    q->values = malloc(sizeof(product) * q->size);
    q->numEntries = 0;
    q->head = NULL;
    q->tail = NULL;
}

// Checks if the queue is empty
bool QEmpty(queue *q)
{
    return (q->numEntries == 0);
}

// Checks if the queue is full
bool QFull(queue *q)
{
    return (q->numEntries == q->size);
}

// Used for adding products to the queue
bool put(queue *q, product prod)
{
    // If the queue is full we can not add to it
    if (QFull(q))
    {
        return false;
    }
    // Add product to the end of the queue
    q->values[q->tail] = prod;
    q->numEntries++;
    // Move index of the tail
    q->tail = (q->tail + 1);
    // If index goes out of bounds set back to 0
    if (q->tail >= q->size)
    {
        q->tail = 0;
    }
    return true;
}

// Used for removing products for the queue
product get(queue *q)  
{
    product result;

    // If the queue is empty we can not dequeue anymore
    if (QEmpty(q))
    {
        perror("Error on dequeue");
    }

    // Remove from the head of the queue
    result = q->values[q->head];
    q->head = (q->head + 1) & q->size;
    q->numEntries--;

    return result;
}
mitkmikd

mitkmikd1#

我建议修改代码,使它更可读,因为现在它是相当难以理解。
在这里我指出一些导致错误的代码问题:
1.如果条件if (pidN = fork() == 0) {...}
您应该先fork()然后检查pid:

int pidN = fork();
 if (pidN == 0) {...}
  1. fprintf()函数fprintf("Thread ID: %d\n", prodType);
    编写函数fputs或fprintf:
fputs("Thread ID: \n", fp);

fprintf(fp, "Thread ID: %d\n", prod.productType);
  1. fprintf()函数的参数:注意您传递的变量,因为prodType不存在。

相关问题