我正在学习IPC,并有一个System V消息队列使用的基本示例。代码很简单:主进程创建消息队列,然后fork(),并开始在无限循环中发送消息,并有一些延迟。子进程在循环中接收消息,逻辑为:睡眠-接收所有累积-睡眠。
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
typedef struct
{
int value;
} messageText;
typedef struct
{
long mtype;
messageText mtext;
} queueMessage;
int main()
{
key_t queueKey = ftok(__FILE__, 'M');
int msgQueue = msgget(queueKey, IPC_CREAT | 0660);
pid_t pid = fork();
if (0 == pid)
{
// never returns
child_main(msgQueue);
abort();
}
queueMessage msg =
{
.mtype = 1,
.mtext = { 0 }
};
for (int i = 0; /* void */ ; ++i)
{
msg.mtext.value = i;
int rc = msgsnd(msgQueue, &msg, sizeof(msg.mtext), 0);
if (rc == -1)
perror("msgsnd");
usleep(250000); // 0.25 sec
struct msqid_ds queueInfo;
msgctl(msgQueue, IPC_STAT, &queueInfo);
printf("Messages in queue = %ld\n", queueInfo.msg_qnum);
}
return 0;
}
void child_main(int msgQueue)
{
queueMessage msg = { 0 };
int msgTyp = 0; // 1st message in queue
int msgRcvFlg = IPC_NOWAIT | MSG_NOERROR;
while (1)
{
while (1)
{
int rc = msgrcv(msgQueue, &msg, sizeof(msg.mtext), msgTyp, msgRcvFlg);
if (-1 == rc)
{
if (errno == ENOMSG)
{
goto sleep;
}
else
{
perror("msgrcv");
exit(EXIT_FAILURE);
}
}
printf("read: %d\n", msg.mtext.value);
}
sleep:
usleep(5000000); // 5 sec.
}
}
因此,预期的行为是父消息在队列中累积,直到子消息醒来,读取它们并再次sleep()。为了验证逻辑是否正确,我在每次父母发送新消息时打印消息编号。但实际的行为是队列内容似乎由于某种原因被重置,所以只有很少的消息被child接收。父节点休眠的时间越长,子节点尝试接收消息时队列中的消息就越少。
这是父睡眠0.25秒的输出示例。
Messages in queue = 1
Messages in queue = 2
Messages in queue = 3
Messages in queue = 4
Messages in queue = 5
Messages in queue = 0
Messages in queue = 1
Messages in queue = 2
Messages in queue = 3
read: 8
read: 9
read: 10
read: 11
Messages in queue = 0
Messages in queue = 1
Messages in queue = 0
...
ipcs -l
输出
------ Messages Limits --------
max queues system wide = 32000
max size of message (bytes) = 8192
default max size of queue (bytes) = 16384
------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 18014398509465599
max total shared memory (kbytes) = 18446744073709551612
min seg size (bytes) = 1
------ Semaphore Limits --------
max number of arrays = 32000
max semaphores per array = 32000
max semaphores system wide = 1024000000
max ops per semop call = 500
semaphore max value = 32767
我在手册页中没有找到任何关于这种行为的信息。有人能解释一下吗?
1条答案
按热度按时间umuewwlo1#
鉴于有证据表明,在系统重新启动后,该问题无法重现,我大胆猜测,该问题是由 * 孤立进程 * 从同一个陈旧的消息队列中消耗消息造成的。
msgget(queueKey, IPC_CREAT | 0660);
仅在没有与给定键关联的消息队列时创建一个 new 消息队列。否则,它会悄悄地返回先前创建的消息队列。IPC_CREAT | IPC_EXCL
必须在标志中指定,以使msgget
在已经存在与给定键关联的消息队列时失败(将errno
设置为EEXIST
)。示例中没有任何内容可以清理消息队列。这是通过以下方式完成的:
如果不这样做,System V消息队列将在系统的缓存期间持续存在(除非由
ipcrm
手动删除)。此外,
ftok
的确定性特性及其相对较小的地址空间意味着,不同的进程通过冲突获取相同的key_t
并消耗消息队列中的消息的可能性很小(但并非不可能)。通过使用
IPC_PRIVATE
作为msgget
的密钥,可以部分解决这两个可能的问题(当使用msgsnd
/msgrcv
时,具有正确权限的进程仍然可以手动指定一个 msqid 进行交互)。一般的建议是确保在使用完资源后清理它们。这意味着正确地 * 获取 * 子进程,删除消息队列(也包括共享内存和信号量!)),当然还要检查所有的系统调用。