C中的消息队列:实现双向通信

yzuktlbb  于 2023-04-05  发布在  其他
关注(0)|答案(2)|浏览(133)

我是一名学生,是C语言的初学者。我想在C语言的linux中使用消息队列实现双向通信。我需要两个队列还是一个队列来完成?
我还想知道我可以发送数据(代码中显示)到另一个进程,或者我需要将其声明为字符数组。

typedef struct msg1
{
    int mlen;
    char *data;
}M1;

typedef struct msgbuf
{
    long    mtype;
    M1      *m;
} message_buf;

提前感谢:)

a1o7rhls

a1o7rhls1#

我还想知道我可以发送数据(显示在代码)到另一个进程,或者我需要声明它作为一个字符数组
是的你可以发送数据到另一个进程
喜欢

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#define MAXSIZE     128

void die(char *s)
{
  perror(s);
  exit(1);
}

struct msgbuf
{
    long    mtype;
    char    mtext[MAXSIZE];
};

main()
{
    int msqid;
    int msgflg = IPC_CREAT | 0666;
    key_t key;
    struct msgbuf sbuf;
    size_t buflen;

    key = 1234;

    if ((msqid = msgget(key, msgflg )) < 0)   //Get the message queue ID for the given key
      die("msgget");

    //Message Type
    sbuf.mtype = 1;

    printf("Enter a message to add to message queue : ");
    scanf("%[^\n]",sbuf.mtext);
    getchar();

    buflen = strlen(sbuf.mtext) + 1 ;

    if (msgsnd(msqid, &sbuf, buflen, IPC_NOWAIT) < 0)
    {
        printf ("%d, %d, %s, %d\n", msqid, sbuf.mtype, sbuf.mtext, buflen);
        die("msgsnd");
    }

    else
        printf("Message Sent\n");

    exit(0);
}

//IPC_msgq_rcv.c

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#define MAXSIZE     128

void die(char *s)
{
  perror(s);
  exit(1);
}

typedef struct msgbuf
{
    long    mtype;
    char    mtext[MAXSIZE];
} ;

main()
{
    int msqid;
    key_t key;
    struct msgbuf rcvbuffer;

    key = 1234;

    if ((msqid = msgget(key, 0666)) < 0)
      die("msgget()");

     //Receive an answer of message type 1.
    if (msgrcv(msqid, &rcvbuffer, MAXSIZE, 1, 0) < 0)
      die("msgrcv");

    printf("%s\n", rcvbuffer.mtext);
    exit(0);
}

如果您了解消息队列,那么消息队列用于进程间通信。
对于多个进程之间的双向通信,还需要多个消息队列

qybjjes1

qybjjes12#

这是我写的一个消息队列,我可以将它包含在我的任何多线程程序中。在进程之间,你可以使用sockets或mqueue. h,但是我设计的这个轻量级消息队列库可以创造奇迹。
下面是“messaging.h”头文件。

//  messaging.h
//
//  Created by Justin Jack on 11/2/17.
//  Copyright © 2017 Justin Jack. All rights reserved.

// This code may be freely distributed for personal and business
// use.  Additional credits may be added, but the original author
// information musy stay part of this code.
//

#ifndef messaging_h
#define messaging_h

#include <stdio.h>
#include <pthread.h>
#include <memory.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>

#define MAX_JMESSAGES 2000
#define JMSG_INIT_QUEUE_SIZE 100
#define JMSG_EMPTY_QUEUE -1

//#define QUEUE_DEBUG

static pthread_mutex_t messagelock = PTHREAD_MUTEX_INITIALIZER;



typedef enum _jmessages {
    JMSG_QUIT = 1,  // 1 
    JMSG_CHANGE,    // 2
    JMSG_FAILURE,   // 3
    JMSG_CONTINUE,  // 4
    JMSG_DRAW,      // 5
    JMSG_CUSTOM1,   // 6
    JMSG_CUSTOM2,   // 7
    JMSG_CUSTOM3,   // 8
    MSG_CUSTOM4,   // 9
    JMSG_CUSTOM5,   // 10
    JMSG_CUSTOM6,   // 11
    JMSG_CUSTOM7,   // 12
    JMSG_CUSTOM8,   // 13
    JMSG_CUSTOM9,   // 14
    JMSG_CUSTOM10  // 15
} JMESSAGES;


/* Returned by popmessage() */
typedef enum jmsg_returns {
    JMSG_NOMESSAGE, // 0
    JMSG_MOREMESSAGES, // 1
    JMSG_LASTMESSAGE, // 2
    JMSG_TIMEOUT // 3
} JMSG_RETURN;

typedef enum jmsg_errors {
    JMSG_QUEUENOTFOUND = -1,
    JMSG_OKAY, // 0
    JMSG_QUEUEFULL, // 1
    JMSG_OUTOFMEMORY // 2
} JMSG_ERROR;

typedef struct jmessage {
   unsigned long long message;
    unsigned long long value1;
    unsigned long long value2;    
    union _mptr {
        void *voidptr;
        char *charptr;
        unsigned int *uintptr;
        int *intptr;
        double *doubleptr;
        float *floatptr;
        unsigned long long *lluptr;
        long long *llptr;
        short *shortptr;
        unsigned short *ushortptr;
    } ptr;
    int pointerlength;
    int needtofreepointer;
    pthread_t fromthread;
    const char *fromthreadname;
} JMESSAGE;

typedef struct jqueue {
    pthread_t owningthreadid;
    char semname[20];
    char threadname[100];
    sem_t *messagesignal;
    int queuesize;
    JMESSAGE *jmessagelist;
    int lastfulljmessage;
    struct jqueue *next;
    struct jqueue *previous;
} JQUEUE;

static JQUEUE *jqueue = 0;

/* Creates a messagequeue for the current thread */
int createmessagequeue( char *description );

/* Destroys the message queue on this thread */
int destroymessagequeue();

/* Sends the message in JMESSAGE *jmsg to another thread */
int pushmessage( JMESSAGE *jmsg, pthread_t to_thread_id );

/* Checks for a message waiting for this queue.  If found,
   it populates the message in the JMESSAGE * parameter passed.

   Returns:
   --------
   JMSG_QUEUENOTFOUND (No call to createmessagequeue()
   JMSG_EMPTY_QUEUE   (The message queue was empty)
   JMSG_MOREMESSAGES  (You should loop while you get this)
   JMSG_LASTMESSAGE   (The last message on the stack)
 */
int popmessage( JMESSAGE *jmsg );

/* A blocking version of popmessage() */
int waitmessage( JMESSAGE *jmsg );

/* Internal Functions, public use not needed */
static int lockit();
static int unlockit();
static int popmessagenosem( JMESSAGE *jmsg );

#endif /* messaging_h */

下面是“messaging.c”源文件:

//
//  messaging.c
//  DeedleCore
//
//  Created by Justin Jack on 11/2/17.
//  Copyright © 2017 Justin Jack. All rights reserved.
//

#include "messaging.h"

static int lockit() {
    int x= 0;
    pthread_t callingthreadid = pthread_self();
    x = pthread_mutex_lock(&messagelock);
#ifdef QUEUE_DEBUG
    if (x != 0 ) {
        printf("lockit(%lld) failed to lock mutex.\n", callingthreadid);
        switch (x) {
            case EINVAL:
                printf("\t\tEINVAL:The mutex was created with the protocol attribute having the value PTHREAD_PRIO_PROTECT and the calling thread's priority is higher than the mutex's current priority ceiling.\n");
                break;
            case EBUSY:
                printf("\t\tEBUSY:The mutex could not be acquired because it was already locked.\n");
                break;
            case EAGAIN:
                printf("\t\tEAGAIN:The mutex could not be acquired because the maximum number of recursive locks for mutex has been exceeded.\n");
                break;
            case EDEADLK:
                printf("\t\tEDEADLK:The current thread already owns the mutex.\n");
                break;
            case EPERM:
                printf("\t\tEPERM:The current thread does not own the mutex.\n");
                break;
            default:
                break;
        }
    }
#endif
    return x;
}

static int unlockit() {
    int x = 0;
    pthread_t callingthreadid = pthread_self();
    x = pthread_mutex_unlock(&messagelock);
#ifdef QUEUE_DEBUG
    if (x) {
        printf("\tunlockit(&lld) failed to unlock mutex.\n", callingthreadid);
    }
#endif
    return x;
}

static char *getthreadname( pthread_t idnum ) {
    static char retval[] = "";
    JQUEUE *jq = jqueue;
    if (!jq) return retval;
    lockit();
    do {
        if (jq->owningthreadid == idnum) {
            unlockit();
            return jq->threadname;
        }
        jq = jq->next;
    } while (jq);

    unlockit();
    return retval;
}

int createmessagequeue( char *description ) {
    int retval = JMSG_QUEUENOTFOUND;
    JQUEUE *jq = jqueue;
    pthread_t callingthreadid = pthread_self();

    pthread_mutexattr_t ptt;
    if (!jq) {
#ifdef QUEUE_DEBUG
        printf("messaging.c > Initializing Messaging System ********************** \n");
#endif
        pthread_mutexattr_init(&ptt);
#ifdef PTHREAD_MUTEX_RECURSIVE_NP
        pthread_mutexattr_settype(&ptt, PTHREAD_MUTEX_RECURSIVE_NP);
#else
        pthread_mutexattr_settype(&ptt, PTHREAD_MUTEX_RECURSIVE);
#endif
        pthread_mutex_init(&messagelock, &ptt);
        lockit();
        jqueue = calloc(1, sizeof(JQUEUE));
        if (jqueue) {
            jqueue->jmessagelist = calloc(JMSG_INIT_QUEUE_SIZE, sizeof(JMESSAGE));
            if (jqueue->jmessagelist) {
                jqueue->queuesize = JMSG_INIT_QUEUE_SIZE;
                jqueue->owningthreadid = callingthreadid;
                jqueue->lastfulljmessage = JMSG_EMPTY_QUEUE;
                if (description) {
                    if (strlen(description) < 100) {
                        sprintf(jqueue->threadname, "%s", description);
                    } else {
                        sprintf(jqueue->threadname, "%.*s", 99, description);
                    }
                }
                sprintf(jqueue->semname, "SEM%d", (int)callingthreadid);
                jqueue->messagesignal = sem_open(jqueue->semname, O_CREAT, 0644, 0);
#ifdef QUEUE_DEBUG
                printf("messaging.c > Message queue set up for \"%s\" (%lld).\n ", jqueue->threadname, callingthreadid);
#endif
                retval = JMSG_OKAY;
            } else {
                free(jqueue);
                jqueue = 0;
                retval = JMSG_OUTOFMEMORY;
            }
        } else {
            retval = JMSG_OUTOFMEMORY;
        }
    } else {
        lockit();
        do {
            if (jq->owningthreadid == callingthreadid) {
                retval = JMSG_OKAY;
                break;
            }
            if (jq->next == 0) {
                jq->next = calloc(1, sizeof(JQUEUE));
                if (jq->next) {

                    jq->next->jmessagelist = calloc(JMSG_INIT_QUEUE_SIZE, sizeof(JMESSAGE));

                    if (jq->next->jmessagelist) {
                        jq->next->previous = jq;
                        jq->next->queuesize = JMSG_INIT_QUEUE_SIZE;
                        jq->next->owningthreadid = callingthreadid;
                        jq->next->lastfulljmessage = JMSG_EMPTY_QUEUE;
                        sprintf(jq->next->semname, "SEM%d", (int)callingthreadid);
                        if (description) {
                            if (strlen(description) < 100) {
                                sprintf(jq->next->threadname, "%s", description);
                            } else {
                                sprintf(jq->next->threadname, "%.*s", 99, description);
                            }
                        }
#ifdef QUEUE_DEBUG
                        printf("messaging.c > Message queue set up for \"%s\" (%lld).\n ", jq->next->threadname, callingthreadid);
#endif
                        jq->next->messagesignal = sem_open(jq->next->semname, O_CREAT, S_IRWXU, 0);
                        retval = JMSG_OKAY;
                    } else {
                        free(jq);
                        jq = 0;
                        retval = JMSG_OUTOFMEMORY;
                    }
                }
                break;
            } 
            jq = jq->next;
        } while (jq);
    }
    unlockit();
    return retval;
}

int destroymessagequeue() {
    int i = 0, retval = JMSG_QUEUENOTFOUND;
    JQUEUE *jq = jqueue;
    JMESSAGE *jm = 0;
    pthread_t callingthreadid = pthread_self();
    lockit();
    if (jq) {
        /* 
         * Search for messages waiting to be delivered that are from THIS
         * QUEUE that we're destroying.  We need to replace the pointer that
         * is pointing to the name of this queue with a generic (const char *)
         * saying "(dead queue)" so we don't get an invalid memory read when
         * it's delivered.
         * 
         */
        do {
            if (jq->owningthreadid != callingthreadid) {
                for (i = 0; i <= jq->lastfulljmessage; i++) {
                    jm = &jq->jmessagelist[i];
                    if (jm->fromthread == callingthreadid) {
                        jm->fromthreadname = "(dead queue)";
                    }
                }
            }
            jq = jq->next;
        } while (jq);

        /* Reset jq, find this queue and destroy it */
        jq = jqueue;
        do {
            if (jq->owningthreadid == callingthreadid) {
                /* Found the queue */
                retval = JMSG_OKAY;
#ifdef QUEUE_DEBUG
                printf("messaging.c > Destroying message queue for \"%s\" (%lld)\n", jq->threadname, jq->owningthreadid);
#endif
                /* 
                 Free any internal pointers that were supposed
                 to be freed after retrieval.
                 */
                for (i = 0; i <= jq->lastfulljmessage; i++) {
                    sem_wait(jq->messagesignal);
                    if (jq->jmessagelist[i].needtofreepointer) {
                        free(jq->jmessagelist[i].ptr.charptr);
                    }

                }
                /* Free the queue messages */
                free(jq->jmessagelist);

                sem_unlink(jq->semname);

                /* Unlink the queue */

                if (jq->previous) { /* If this isn't the first element */
                    jq->previous->next = jq->next;
                    if (jq->next) {
                        jq->next->previous = jq->previous;
                    }
                } else {
                    if (jq->next) {
                        jq->next->previous = 0;
                    }
                }

                free(jq);
                break;

            }
            jq = jq->next;
        } while (jq);
    }
    unlockit();
    return retval;
}

int pushmessage( JMESSAGE *jmsg, pthread_t to_thread_id ) {
    int queuespotsleft = 0, retval = JMSG_QUEUENOTFOUND;
    JQUEUE *jq = jqueue;
    JMESSAGE *jm = 0, *newjmqueue;
    pthread_t callingthreadid = pthread_self();
    if (!jq) {
        return JMSG_QUEUENOTFOUND;
    }
    lockit();
    do {
        if (jq->owningthreadid == to_thread_id) {
            if (jq->lastfulljmessage == JMSG_EMPTY_QUEUE) {
                jq->lastfulljmessage = 0;
                jm = &jq->jmessagelist[0];
            } else {
                if (jq->lastfulljmessage + 1 > MAX_JMESSAGES) {
                    /* We have too many messages backed up */
                    unlockit();
                    return JMSG_QUEUEFULL;
                }
                if (jq->lastfulljmessage + 1 > (jq->queuesize - 1)) {
                    /* 
                     * We're getting backed up, we need to allocate more
                     * space.  This is slowly moving the allocated message
                     * queue memory for this thread towards the MAX_JMESSAGES
                     * 
                     */
                    queuespotsleft = MAX_JMESSAGES - (jq->lastfulljmessage + 1);
                    if (queuespotsleft > 50) queuespotsleft = 50;
                    if (queuespotsleft > 0) {
                        newjmqueue = realloc(jq->jmessagelist, ((jq->lastfulljmessage + 1) * sizeof(JMESSAGE)));
                        if (!newjmqueue) {
                            unlockit();
                            return JMSG_OUTOFMEMORY;
                        }
                        jq->jmessagelist = newjmqueue;
                        jq->lastfulljmessage++;
                        jm = &jq->jmessagelist[jq->lastfulljmessage];
                    } else {
                        retval = JMSG_QUEUEFULL;
                    }
                } else {
                    /* It's withing the bounds of the allocated message space */
                    jm = &jq->jmessagelist[++jq->lastfulljmessage];
                }
            }
            if (jm) {
                retval = JMSG_OKAY;
                memcpy(jm, jmsg, sizeof(JMESSAGE));
                jm->fromthread = callingthreadid;
                jm->fromthreadname = getthreadname(callingthreadid);
                /* Go ahead and increment the semaphore count in case
                 * they're calling waitmessage()
                 * 
                 */
                sem_post(jq->messagesignal);
            }
            break;
        }
        jq = jq->next;
    } while (jq);
    unlockit();
    return retval;
}

int popmessage( JMESSAGE *jmsg ) {
    int retval = JMSG_QUEUENOTFOUND;
    JQUEUE *jq = jqueue;
    pthread_t callingthreadid = pthread_self();
    if (!jq) {
        return JMSG_QUEUENOTFOUND;
    }
    memset(jmsg, 0, sizeof(JMESSAGE));
    lockit();
    do {
        if (jq->owningthreadid == callingthreadid) {
            if (jq->lastfulljmessage > JMSG_EMPTY_QUEUE) {
                memcpy(jmsg, &jq->jmessagelist[jq->lastfulljmessage], sizeof(JMESSAGE));
                memset(&jq->jmessagelist[jq->lastfulljmessage], 0, sizeof(JMESSAGE));
                jq->lastfulljmessage--;
                retval = ((jq->lastfulljmessage == JMSG_EMPTY_QUEUE) ? JMSG_LASTMESSAGE : JMSG_MOREMESSAGES);
                /* Decrease the semaphore count because they're NOT calling waitmessage() but may later... */
                sem_wait(jq->messagesignal);
            } else {
                retval = JMSG_NOMESSAGE;
            }
            break;
        }
        jq = jq->next;
    } while (jq);
    unlockit();
    return retval;
}

/* 
 * This function is only called by waitmessage()  It doesn't decrease the
 * semaphore count, because we've already waited for it to be signaled 
 * in waitmessage().  This just pulls the message off of the stack.
 * 
 */
static int popmessagenosem( JMESSAGE *jmsg ) {
    int retval = JMSG_QUEUENOTFOUND;
    JQUEUE *jq = jqueue;
    pthread_t callingthreadid = pthread_self();
    if (!jq) {
        return JMSG_QUEUENOTFOUND;
    }
    lockit();
    do {
        if (jq->owningthreadid == callingthreadid) {
            if (jq->lastfulljmessage > JMSG_EMPTY_QUEUE) {
                memmove(jmsg, &jq->jmessagelist[jq->lastfulljmessage], sizeof(JMESSAGE));
                memset(&jq->jmessagelist[jq->lastfulljmessage], 0, sizeof(JMESSAGE));
                jq->lastfulljmessage--;
                retval = ((jq->lastfulljmessage == JMSG_EMPTY_QUEUE) ? JMSG_NOMESSAGE :
                ((jq->lastfulljmessage == 0) ? JMSG_LASTMESSAGE : JMSG_MOREMESSAGES));
            } else {
                retval = JMSG_NOMESSAGE;
            }
            break;
        }
        jq = jq->next;
    } while (jq);
    unlockit();
    return retval;
}


int waitmessage( JMESSAGE *jmsg ) {
    JQUEUE *jq = jqueue;
    sem_t *waitingon = 0;
    pthread_t callingthreadid = pthread_self();
    if (!jq) {
        return JMSG_QUEUENOTFOUND;
    }
    lockit();
    do {
        if (jq->owningthreadid == callingthreadid) {
            //printf("Waiting on semaphore %s to be signalled...\n", jq->semname);
            waitingon = jq->messagesignal;
            break;
        }
        jq = jq->next;
    } while (jq);
    unlockit();
    if (!waitingon) return JMSG_QUEUENOTFOUND;
    //printf("waiting on semaphore!\n");
    sem_wait(waitingon);
    popmessagenosem(jmsg);
    //printf("semaphore signalled! continuing...\n");
    return JMSG_OKAY;
}

享受吧!我喜欢它。我在所有的线程中设置了消息循环,它们可以做不同的事情,数据可以轻松快速地传递!
下面是一个使用此代码的工作示例:**确保链接到-lpthread

#include <signal.h>
#include "messaging.h"

#define THREAD_STARTED      JMSG_CUSTOM1
#define THREAD_FAILED       JMSG_CUSTOM2
#define THREAD_EXITING      JMSG_CUSTOM3
#define THREAD_ACKNOWLEDGE  JMSG_CUSTOM4

void sighand( int signum ) {
    JMESSAGE msg;
    printf("\nSignal Received! Exiting...\n");
    msg.message = JMSG_QUIT;
    pushmessage(&msg, pthread_self()); /* Put this JMSG_QUIT message on 
                                        * the main thread's message queue 
                                        * to let the main code know to quit.
                                        */
    return;
}

void *mythread( void *_mainthreadid ) {
    pthread_t mainthreadid = (pthread_t)_mainthreadid;
    JMESSAGE msg;

    signal(SIGINT, &sighand); /* CTRL-C */

    if (!createmessagequeue("mythread")) {
        printf("main.c > mythread() > createmessagequeue(): Failed.\n");
        return 0;
    }

    /* 
     * Send a message to the main thread so it can do something when it 
     * knows we're ready.
     */
    msg.message = THREAD_ACKNOWLEDGE;
    pushmessage(&msg, mainthreadid);

    printf("main.c > mythread(): Launched successfully, using blocking message loop!\n");
    do {
        waitmessage(&msg); /* 
                            * Wait indefinitely.  You can, however, use a 
                            * signal to send a message to this queue to get
                            * it to move along, or signal it from another thread
                            * to get it to move along.  
                            * 
                            */
        switch (msg.message) {
            case THREAD_ACKNOWLEDGE:
                printf("main.c > mythread(): THREAD_ACKNOWLEDGE received from thread \"%s\" (0x%x).\n", msg.fromthreadname, msg.fromthread);
                fflush(stdout);
                break;
            default:
                break;
        }
    } while (msg.message != JMSG_QUIT);
    printf("main.c > mythread(): Got JMSG_QUIT.\n");
    msg.message = THREAD_EXITING;
    pushmessage(&msg, mainthreadid);
    printf("main.c > mythread(): Calling destroymessagequeue()!\n");
    destroymessagequeue();
    printf("main.c > mythread(): Exiting.\n");
    return 0;
}

int main( void ) {
    JMESSAGE msg;
    pthread_t mythreadid;
    int ret;

    ret = createmessagequeue("Main Thread");
    if (!ret) {
        printf("main.c > createmessagequeue(): Failed with %d.\n", ret);
        return 0;
    }

    pthread_create(&mythreadid, 0, &mythread, (void *)pthread_self());

    printf("main.c > main(): Press [CTRL-C] to terminate program.\n");
    do {
        /* NON Blocking message queue */
        if (popmessage(&msg)) {
            switch (msg.message) {
                case JMSG_QUIT:
                    /* Forward the message on to any other queues */
                    if (pushmessage(&msg, mythreadid)) {
                        printf("main.c > main(): Received JMSG_QUIT. Forwarded message to mythreadid\n");
                        fflush(stdout);
                        pthread_join(mythreadid, 0);
                    }
                    break;
                case THREAD_ACKNOWLEDGE:
                    printf("main.c > main(): Received a THREAD_ACKNOWLEDGE from thread \"%s\" (0x%x)\n", msg.fromthreadname, msg.fromthread);
                    /* Bounce message back for the heck of it */
                    pushmessage(&msg, mythreadid);
                    fflush(stdout);
                   break;
                case THREAD_EXITING:
                    printf("main.c > main(): Received a THREAD_EXITING from thread \"%s\" (0x%x)\n", msg.fromthreadname, msg.fromthread);
                    fflush(stdout);
                    break;
                default:
                    break;
            }
        } else { /* No messages do some important stuff */
            usleep(20000); /* Take a breather */
        }
    } while (msg.message != JMSG_QUIT);
    printf("main.c > main(): Calling destroymessagequeue()!\n");
    destroymessagequeue();
    printf("main.c > main(): Exiting program!\n");
    fflush(stdout);
   return 0;
}

相关问题