C语言 如何让孩子发送信息的计时器?

ryevplcw  于 2023-01-12  发布在  其他
关注(0)|答案(1)|浏览(140)

我需要创建一个项目,其中一个主程序创建一个给定数量的子进程,每个子启动exec,有2种类型的子进程,他们使用消息队列通信。每一秒所有的孩子都必须发送他们的状态和情况(如果你想了解更多的信息是什么信息需要让我知道)的主程序,我的问题是:所有的子程序如何将信息发送到主程序,而不会因为子程序正在等待/写入消息队列中的消息而得到“中断的系统调用”?

char *receiveMessage(int msgQId, long tipo) {
    my_msg_buf msgBuf;
    sigset_t mask;

    sigemptyset(&mask);
    sigaddset(&mask, SIGUSR1);

    sigprocmask(SIG_BLOCK, &mask, NULL );
    while(1)
    if (msgrcv(msgQId, (void *)&msgBuf, MAX_LINE_LENGTH, tipo, IPC_NOWAIT) == -1) {
        if(errno != ENOMSG && errno != EINTR){
            fprintf(stderr, "Ricevuto errore: %s\n", strerror(errno));
            return NULL;
        }
        if(errno == ENOMSG)
            return NULL;
    }else break;

    sigprocmask(SIG_UNBLOCK, &mask, NULL );

    return strdup(msgBuf.testo);
}

我尝试使用循环等待消息,如果errno等于EINTR,则继续循环,但它似乎不起作用,子进程似乎永远不会结束。我还考虑在每个子进程中设置一个内部计时器,但我不知道如何处理消息队列,以及在执行过程中在哪里检查时间。

r55awzrz

r55awzrz1#

我以前在实际的商业产品中使用过IPC消息队列。
我不需要使用信号,而是使用nanosleep来等待,这样就避免了EINTR的混乱。
下面是一些 backbone 代码,它只使用了一个消息队列和几种消息类型(一个用于主,一个用于每个港口,一个用于每个船舶)。
它允许:
1.与所有船舶和港口通信的主要过程。
1.任何一个港口都可以与其他港口和所有船只通信。
1.任何一艘船都能与其他船只和所有港口通信。
在下面的代码中,我展示了一些港口代码。船舶代码将是类似的。同样,主进程代码也将是类似的。
代码是 backbone /不完整的。但是,它显示了基本的机制。
我为港口数据创建了一个单独的结构体,它同时用于消息中的港口"当前状态"数组 * 和 *。
最好将港口当前状态和港口特定数据的类型分开,也就是说,我们可以使用struct harbor_statestruct harbor_msg来代替struct harbor
不管怎样,这是代码。它编译了,但没有测试。它被注解了:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

struct ship {
    long ship_mtyp;                         // msgtyp
    int ship_idx;                           // index into ships array
    pid_t ship_pid;                         // ship's process id

    // ship specific data ...
    int ship_cargo;
};

struct harbor {
    long harbor_mtyp;                       // msgtyp
    int harbor_idx;                         // index into harbors array
    pid_t harbor_pid;                       // harbor's process id

    // harbor specific data ...
    int harbor_nship;
};

struct main {
    long main_mtyp;                         // msgtyp
    int main_total;
};

enum type {
    TYPE_SHIP,
    TYPE_HARBOR,
    TYPE_MAIN,
};

struct msg {
    long msg_mtyp;
    enum type msg_type;
    union {
        struct ship ship;
        struct harbor harbor;
        struct main main;
    } msg_data;
};

#define NSHIP       20
#define NHARBOR     5

struct harbor harbors[NHARBOR];
struct ship ships[NSHIP];

long mtyp_uniq = 0;                     // unique msgtyp
long main_mtyp;
struct main main_data;

int msqid;                              // message queue id

#define MSGLEN  (sizeof(struct msg) - sizeof(long))

typedef long long tod_t;
#define TODNS       1000000000
#define INTVNS      TODNS               // wakeup every second

// todget -- get time of day in nanoseconds
tod_t
todget(void)
{
    struct timespec ts;
    tod_t tod;

    clock_gettime(CLOCK_MONOTONIC,&ts);

    tod = ts.tv_sec;
    tod *= 100000000;
    tod += ts.tv_nsec;

    return tod;
}

tod_t todold;

// waitfor -- wait for next interval
void
waitfor(void)
{
    tod_t todnow;
    tod_t todfire;
    tod_t todsleep;
    struct timespec ts;

    // get current time
    todnow = todget();

    // initialize previous firing time
    if (todold == 0)
        todold = todnow;

    // get firing time of next batch of outgoing messages
    todfire = todold + INTVNS;

    // get amount to sleep
    todsleep = todfire - todnow;

    // sleep a bit
    if (todsleep > 0) {
        ts.tv_sec = todsleep / TODNS;
        ts.tv_nsec = todsleep % TODNS;
        nanosleep(&ts,NULL);

        todnow = todget();
    }

    todold = todnow;
}

// harbor_rcv -- receive all current/pending message sent to this harbor
void
harbor_rcv(struct harbor *self)
{
    struct msg msg;
    int flags = IPC_NOWAIT;
    ssize_t len;
    struct ship *smsg;
    struct ship *ship;
    struct harbor *hmsg;
    struct harbor *harbor;
    struct main *mmsg;

    while (1) {
        len = msgrcv(msqid,&msg,MSGLEN,self->harbor_mtyp,flags);

        // nothing to process
        if (len == 0)
            break;

        // error
        if (len < 0) {
            // no message
            if (errno == EAGAIN)
                break;

            perror("msgrcv");
            exit(1);
        }

        // process the message
        switch (msg.msg_type) {
        case TYPE_SHIP:
            smsg = &msg.msg_data.ship;
            ship = &ships[smsg->ship_idx];
            *ship = *smsg;
            break;

        case TYPE_HARBOR:
            hmsg = &msg.msg_data.harbor;
            harbor = &harbors[hmsg->harbor_idx];
            *harbor = *hmsg;
            break;

        case TYPE_MAIN:
            mmsg = &msg.msg_data.main;
            main_data = *mmsg;
            break;
        }
    }
}

// harbor_process -- perform all actions for given harbor
void
harbor_process(struct harbor *self)
{
    struct msg msg;
    int flags = 0;

    while (1) {
        // wait until we should send messages
        waitfor();

        // receive messages
        harbor_rcv(self);

        // send our harbor data to all _other_ harbors
        for (int harboridx = 0;  harboridx < NHARBOR;  ++harboridx) {
            struct harbor *harbor = &harbors[harboridx];
            if (harbor == self)
                continue;

            // process incoming messages
            harbor_rcv(self);

            msg.msg_data.harbor = *self;
            msg.msg_mtyp = harbor->harbor_mtyp;
            msgsnd(msqid,&msg,MSGLEN,flags);
        }

        // send our harbor data to all ships
        for (int shipidx = 0;  shipidx < NSHIP;  ++shipidx) {
            struct ship *ship = &ships[shipidx];

            // process incoming messages
            harbor_rcv(self);

            msg.msg_data.harbor = *self;
            msg.msg_mtyp = ship->ship_mtyp;
            msgsnd(msqid,&msg,MSGLEN,flags);
        }

        // send our harbor data to main
        msg.msg_data.harbor = *self;
        msg.msg_mtyp = main_mtyp;
        msgsnd(msqid,&msg,MSGLEN,flags);

        // process incoming messages
        harbor_rcv(self);
    }

    exit(0);
}

// ship_process -- perform all actions for given ship
void
ship_process(struct ship *self)
{

    // TODO -- similar to harbor_process
}

int
main(void)
{
    pid_t pid;

    msqid = msgget(IPC_PRIVATE,IPC_CREAT);
    if (msqid < 0) {
        perror("msgget");
        exit(1);
    }

    main_mtyp = ++mtyp_uniq;

    // initialize all ships
    for (int shipidx = 0;  shipidx < NSHIP;  ++shipidx) {
        struct ship *ship = &ships[shipidx];
        ship->ship_mtyp = ++mtyp_uniq;
        ship->ship_idx = shipidx;
    }

    // initialize all harbors
    for (int harboridx = 0;  harboridx < NHARBOR;  ++harboridx) {
        struct harbor *harbor = &harbors[harboridx];
        harbor->harbor_mtyp = ++mtyp_uniq;
        harbor->harbor_idx = harboridx;
    }

    // launch all ships
    for (int shipidx = 0;  shipidx < NSHIP;  ++shipidx) {
        struct ship *ship = &ships[shipidx];
        pid = fork();
        if (pid == 0)
            ship_process(ship);
        else
            ship->ship_pid = pid;
    }

    // launch all harbors
    for (int harboridx = 0;  harboridx < NHARBOR;  ++harboridx) {
        struct harbor *harbor = &harbors[harboridx];
        pid = fork();
        if (pid == 0)
            harbor_process(harbor);
        else
            harbor->harbor_pid = pid;
    }

    // process incoming messages, etc.
    while (1) {
    }

    return 0;
}
    • 更新日期:**

但是每艘船都必须与每个港口通信,这样我就可以结束成千上万的管道,但是我怎么能在每个"船"和"港口"过程中经过一定时间后中断执行呢?-瓦塞托
不幸的是,拥有"数千个管道"意味着拥有数千个 * 进程 *。这不会 * 伸缩 *。任何数量的线程/进程超过一定数量(例如16个左右),系统将花费大部分时间在它们之间进行上下文切换。
因此,另一个不同的模型是"工作线程"模型。定义处理消息的N线程。这里N相对较小(例如16)。每个线程都从相同的msgtyp中提取消息,进行处理和循环。
使用线程比使用进程更好,或者使用一些拥有一定数量线程的进程。
所有这一切都更多地取决于船舶拥有哪些数据,港口拥有哪些数据,以及它们之间如何交互,因此,我们需要了解实际数据的细节,什么动作触发了船舶到船舶/港口的消息,以及/或者港口到船舶/港口的消息。

相关问题