Unix IPC之消息队列

338次阅读  |  发布于2年以前

1 . 概述

Unix/Linux是多任务的操作系统,它通过多个进程分别处理不同事务来实现,如果多个进程要进行协同工作或者争用同一个资源,互相之间的通讯就很有必要了。

进程间通信(Inter-Process Communication,又称IPC)是指一组由操作系统支持的机制,用于进程间的交互(如协调或通信)。在UNIX/Linux下主要有以下几种方式:

(1)管道(有名管道(pipe)和无名管道(fifo))(2)信号(signal)(3)信号量(semaphore)(4)消息队列(message``queues)(5)共享内存(shared``memory)(6)套接字(socket

2 . 什么是消息队列

消息队列提供了一种将数据块从一个进程发送到另一个进程的方法。每个数据块都被认为有一个类型,它可以实现消息的随机查询,消息不必按照先入先出的顺序读取,用户可以根据消息的类型读取它们。

和有名(pipe)/无名(fifo)管道一样,当从消息队列中读取一条消息的时候,消息队列中相应的数据会被删除。此外,它允许一个或多个进程向其写入或读取消息。每个消息队列都有一个消息队列标识,在整个系统中是唯一的;而且消息队列只有在内核重启或是手动删除(ipcrm)消息队列时才会被删除。如果不手动删除消息队列,则消息队列将会一直存在于系统内核中。

消息队列和管道有着同样的缺点,即每个数据块的最大长度是上限,系统上所有队列的最大长度也是上限。每条消息的最大长度为最大值(MSGMAX),每个消息队列的总字节数为最大值(MSGMNB),系统上的消息队列总数也是最大值(MSGMNI)。

MSGMNIMSGMAXMSGMNB声明于/inclue/uapi/linux/msg.h头文件中。其默认值如下所示:

#define MSGMNI 32000   /* <= IPCMNI */     /* max # of msg queue identifiers */
#define MSGMAX  8192   /* <= INT_MAX */   /* max size of message (bytes) */
#define MSGMNB 16384   /* <= INT_MAX */   /* default max size of a message queue */

/proc/sys/kernel/目录下,有相应的msgmnimsgmaxmsgmnb3个文件,里面记录了当前内核中与消息队列相关的几个参数值。 当然,这些默认值可通过命令sysctl或系统函数sysctl()进行修改。也可直接在/etc/sysctl.conf文件中进行内核配置。 消息队列的本质其实是内核(kernel)提供的一个链表,它实现了一个基于链表的数据结构。消息队列是基于消息的,而管道是基于字节流的,在某个进程往一个队列写入消息之前,并不需要另外某个进程在该消息队列上等待消息的到达。

对于有名管道(pipe)和无名管道(fifo),最后一次关闭发生时,仍在该管道(pipefifo)上的数据将被丢弃。有名管道(pipe)或无名管道(fifo)都是随进程持续的,而消息队列、信号量、共享内存都是随内核持续的。

3 . 消息队列实现

在第一次创建消息队列时,内核会为其分配关联的队列控制块(Queue``Control``BlockQCB)、消息队列名称、唯一ID、内核缓冲区,队列长度、最大消息长度以及一个或多个等待列表。内核还采用开发人员提供的参数(例如队列长度和最大消息长度)来确定消息队列需要多少内存。内核在获得信息后,会从系统内存或某些私有内存空间为消息队列分配空间。

消息队列本身由许多元素组成,每个元素可以保存一条消息。其中保存第一条和最后一条消息的元素称为:头和尾。消息队列的内部原理大致如下图示所示:

消息队列具有两个相关联的任务的等待列表。接收任务等待列表由在队列为空时等待队列的任务组成。发送队列由队列满时等待队列的任务完成。

系统中记录消息队列的数据结构是struct``ipc_ids``msg_ids,它位于内核中,系统中所有的消息队列都可以在结构msg_ids中找到入口。

从Linux内核5.4.3版本的msg.c源文件可以得知,系统上每个现有的消息队列都有一个struct``msg_queue数据结构。该数据类型声明如下:

struct msg_queue {
 struct kern_ipc_perm q_perm; /* 消息队列操作访问权限 */
 time64_t q_stime;    /* 上一次调用msgsnd发送消息的时间 */
 time64_t q_rtime;    /* 上一次调用msgrcv接收消息的时间 */
 time64_t q_ctime;    /* 上一次修改的时间 */
 unsigned long q_cbytes;   /* 队列上当前字节数据 */
 unsigned long q_qnum;   /* 队列中的消息数目 */
 unsigned long q_qbytes;   /* 队列上最大字节数目 */
 struct pid *q_lspid;   /* 上一次调用msgsnd的pid */
 struct pid *q_lrpid;   /* 上一次接收消息的pid */

 struct list_head q_messages;
 struct list_head q_receivers;
 struct list_head q_senders;
 //这3个标注的内核链表用于管理睡眠的发送至(q_senders)、睡眠的接受者(q_receivers)和消息本身(q_messages)
} __randomize_layout;

该结构包含消息队列的状态信息以及队列的访问权限。成员q_stimeq_rtimeq_ctimeq_cbytesq_qnumq_qbytesq_lspidq_lrpid在上面的数据类型声明中采用注释的形式给出说明。

成员q_perm用以说明该消息队列操作访问权限。对于struct``ipc_perm数据类型,其声明如下:

struct ipc_perm
{
 __kernel_key_t key;  //消息队列全局唯一ID
 __kernel_uid_t uid; //所有者有效UID
 __kernel_gid_t gid; //所有者有效GID
 __kernel_uid_t cuid; //创建者有效UID
 __kernel_gid_t cgid; //创建者有效GID
 __kernel_mode_t mode;  //权限
 unsigned short seq; //序列号
};

其中最值得关注的是成员key,其类型为key_t。这是系统对IPC资源的唯一标识符,这个标识符必须在申请IPC资源之前获得,我们可以通过系统函ftok()获得。

而成员q_messagesq_receiversq_senders分别对应于消息本身、睡眠的接收者和睡眠的发送者。而q_messages中的各个消息都封装在一个struct``msg_msg的数据类型中,该结构声明如下:

//  双向链表指针
struct list_head {
 struct list_head *next;
 struct list_head *prev;
};

struct msg_msg {
 struct list_head m_list;
 long  m_type; //消息类型
 int  m_ts; //消息正文长度
 struct msg_msgseg *next;
 /* 接下来是实际的消息*/
}

成员m_list用于连接各个消息的链表元素,其他成员用于管理消息本身。其中m_type指明消息类型,m_ts用于指定消息正文长度(字节)。从该结构声明可以看到,struct``msg_msg数据结构中没有声明成员用于存储消息本身。这是因为每个消息都至少分配了一个内存页,msg_msg实例则保存在该页的起始处,剩余的空间可用于存储消息正文。这正是成员next的作用,如果消息超过一个内存的长消息,则需要用next(参考《深入Linux内核架构》)。 此外,睡眠消息接收者(q_receivers)的数据结构声明分别如下:

/* one msg_receiver structure for each sleeping receiver */
struct msg_receiver {
 struct list_head r_list;
 struct task_struct *r_tsk;

 int    r_mode;
 long   r_msgtype;
 long   r_maxsize;

 struct msg_msg  *r_msg;
};

该结构声明保存了执行消息队列[进程] 的task_struct的指针,以及对预期消息的描述(包括消息类型,消息最大长度等)、指向msg_msg实例的一个指针。

而睡眠的消息生产者(q_senders)的数据类型声明如下:

/* one msg_sender for each sleeping sender */
struct msg_sender {
 struct list_head  list;
 struct task_struct  *tsk;
 size_t                  msgsz;
};

list是链表,tsk是指向对应进程的task_struct指针。

根据前面的描述与介绍可以得出,Linux内核与消息独立建立起来的联系如下图所示:

4 . 系统函数

与消息队列相关联的系统调用函数共有5个,它们分别是:ftok()msgget()msgsnd()msgrcv()msgctl()。下面将分别对这几个系统函数作详细的介绍。

4.1 ftok()

系统函数ftok()的函数原型如下:

#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id);

该函数将一个现有的路径名(pathname)和一个整数标识符(带有proj_id的低8位)转换为key_t值,称为IPC``key,也就是所谓的IPC 唯一秘钥。

注意:它由两个参数(pathnameproj_id)生成key。基本上,pathname必须是这个进程可以读取的文件。另一个参数,proj_id通常被设置为任意字符,比如'K'ftok()函数使用关于已命名文件的信息(比如inode编号等)和proj_id来为msgget()生成一个可能唯一的键(key)。要使用相同队列的程序必须生成相同的密钥,因此它们必须将相同的参数传递给ftok()

4.2 msgget()

系统函数msgget()的原型如下所示:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

该函数用于创建一个新的消息队列,或者是获取一个现有的消息队列。如果该函数调用成功,返回消息队列的msgid作为进程的唯一标识符;失败则返回-1。第一个参数key是一个IPC``key,可以由ftok()函数生成,标识唯一的消息队列。

第二个参数flag是创建方式(IPC_CREATIPC_EXCL)。

单独使用IPC_CREAT表示如果请求的资源已经存在,则直接获得创建IPC资源的应用程序;如果不是,则创建新的IPC资源。

IPC_CREATIPC_EXCL一起使用,以指示用于创建IPC资源的应用程序。如果请求的资源不存在,则创建一个新的IPC资源;如果它已经存在,则返回-1

单独使用IPC_EXCL没有任何意义。它的存在是为了将其与IPC_CREAT一起使用,以确保新创建的资源是可用的。

4.3 msgsnd()

系统函数msgsnd()用于向消息队列添加数据,其函数原型如下:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

该函数成功返回0,失败返回-1。对于该函数的几个参数,详细描述如下:① msqid

msgget()函数返回的消息队列标识符。

msgp

用于指向要发送的消息的指针。

msgsz

msg指向的消息长度,消息缓冲区结构中mtext的大小,不包括数据类型。

注意:msgsnd()msgrcv()函数中的第二个参数(分别是msgpmsg_ptr)的数据类型均为struct msgbuf,其数据类型声明如下(/include/uapi/linux/msg.h文件):

/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
 __kernel_long_t mtype;          /* type of message */
 char mtext[1];                  /* message text */
};

msgflg

指定如何发送消息的标志。比如:msgflg``=``IPC_NOWAIT, 则表示如果消息不能立即发送,不阻塞进程,返回-1,并设置错误码errno(用户区)为EAGAIN。如果不设置标志,则使用值0

4.4 msgrcv()

系统函数msgrcv()用于从消息队列中读取消息,该函数原型如下:

 #include <sys/types.h>
 #include <sys/ipc.h>
 #include <sys/msg.h>

 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                 int msgflg);

对于该函数中的5个参数,详细解释说明如下:① msqid

该参数表示函数msgget()返回的消息队列标识符。

msgp

指向要接收的消息的指针。

msgsz

msgp指向的消息的长度,消息缓冲区结构中mtext的大小,不包括数据类型。正如上面4.3节所描述,msgp的数据类型是struct``msgbuf

msgtyp

我们希望阅读的消息类型。可能是以下情况之一:

i. msgtyp``=``0, 将返回队列上的第一条消息。ii. msgtyp``>``0(正整数), 队列上类型(mtype,即struct``msgbuf中成员mtype)等于此整数的第一条消息(除非msgflg中设置了特定标志,请参见下文⑤)。iii. msgtyp``<``0(负整数), 队列上类型小于或等于此整数绝对值的第一条消息。

如果msgtyp为零,则检索第一条消息,而不管其类型是什么。该值可由接收进程用于指定消息选择。总的来说,msgflg设置为0

msgflg

该参数控制函数的行为,以下任何标志的逻辑“或”组合。

msgflg``=``IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG错误。msgflg``=``MSG_EXCEPT,如果消息类型参数为正整数,则返回类型不等于给定整数的第一条消息。msgflg``=``MSG_NOERROR,当消息大小超过msgze时被截断。

“这个参数依然是控制函数行为的标志,取值可以是:0,表示忽略;IPC_NOWAIT,如果消息队列为空,则返回一个ENOMSG,并将控制权交回调用函数的进程。如果不指定这个参数,那么进程将被阻塞直到函数可以从队列中得到符合条件的消息为止。如果一个client 正在等待消息的时候队列被删除,EIDRM 就会被返回。如果进程在阻塞等待过程中收到了系统的中断信号,EINTR 就会被返回。MSG_NOERROR,如果函数取得的消息长度大于msgsz,将只返回msgsz 长度的信息,剩下的部分被丢弃了。如果不指定这个参数,E2BIG 将被返回,而消息则留在队列中不被取出。当消息从队列内取出后,相应的消息就从队列中删除了。

在Linux内核的5.4.3版本中,msgflg支持以下选项参数:

//// 声明于/include/uapi/linux/msg.h
/* msgrcv options */
#define MSG_NOERROR     010000  /* no error if message is too big */
#define MSG_EXCEPT      020000  /* recv any msg except of specified type.*/
#define MSG_COPY        040000  /* copy (not remove) all queue messages */

特别注意:当msgtype``>``0msgflg``=``MSG_EXCEPT时,接收类型不等于msgtype的第一条消息。

该函数调用成功时,返回实际放入接收缓冲区msgp中的字符数。失败返回-1

4.5 msgctl()

系统函数msgctl()用于消息队列控制功能,它不仅仅有删除消息队列的作用。其函数原型如下:

 #include <sys/types.h>
 #include <sys/ipc.h>
 #include <sys/msg.h>

 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

该函数的各参数详细如下描述:

msqid

msgget()函数返回的消息队列标识符。

cmd

指定要在消息队列上执行的命令,其具体如下:

所以如果我们执行删除操作,我们可以将cmd设置为IPC_RMID。对于IPC_INFO命令(Linux系统特有),其对应的msginof数据类型声明如下(/include/uapi/linux/msg.h文件):

/* buffer for msgctl calls IPC_INFO, MSG_INFO */
struct msginfo {
 int msgpool;
 int msgmap; 
 int msgmax; 
 int msgmnb; 
 int msgmni; 
 int msgssz; 
 int msgtql; 
 unsigned short  msgseg; 
};

buf

如果选项删除队列,则第3个参数bufNULL。参数buf的数据类型是 struct``msqid_ds,其数据类型声明如下所示:

/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
 struct ipc_perm msg_perm;
 struct msg *msg_first;  /* first message on queue,unused  */
 struct msg *msg_last;  /* last message in queue,unused */
 __kernel_time_t msg_stime; /* last msgsnd time */
 __kernel_time_t msg_rtime; /* last msgrcv time */
 __kernel_time_t msg_ctime; /* last change time */
 unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
 unsigned long  msg_lqbytes; /* ditto */
 unsigned short msg_cbytes; /* current number of bytes on queue */
 unsigned short msg_qnum; /* number of messages in queue */
 unsigned short msg_qbytes; /* max number of bytes on queue */
 __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
 __kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};

5 . 与消息队列相关的shell命令

与消息队列相关的shell命令有两个,分别是:ipcsipcrm。它们分别用来查看消息队列以及删除消息队列;除此之外,这两个命令还可以用于共享内存、信号量。这些都将在后面的相关文章中详细进行讲解,本文只着重说明与消息队列相关的选项。

5.1 查看消息队列

使用ipcs -q可以查看当前消息队列中的消息情况。

如下图所示,一开始内核中没有消息队列,然后我们启动write(实现写消息队列的demo)应用程序,之后向该消息队列中写入HELLO字段串,之后再次使用ipcs -q便可查看到消息队列向的keymsqid以及消息数量等信息。

5.2 删除消息队列

现在使用ipcrm将该消息队列中的所有消息删掉,如下所示,可以选择根据keymsqid去删除,这里选项的是根据msqid进行删除。

6 . 消息队列实战

· 实例一该实例中,生产者不断地向消息队列中写入用户终端输入的消息,直到输入“over”字符串后,结束该过程。

生产者代码:

/// 向消息队列写入消息:sender.c文件;   gcc sender.c -o sender
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>

struct msgbuf{
        int mtype;
        char mtext[128];
};

int main(int argc,char *argv[])
{
        int msgid;
        char buf[128] = {0};
        key_t key = 11111;

        struct msgbuf sndbuf;

        if((msgid = msgget(key, 0666 | IPC_CREAT)) < 0)
        {
                perror("msgget");
                exit(-1);
        }

        while(1)
        {
                printf("Enter some text: ");

                memset(&sndbuf, 0x00, sizeof(sndbuf));
                sndbuf.mtype = 1;

                memset(buf, 0x00, sizeof(buf));
                fgets(buf, sizeof(buf), stdin);

                strncpy(sndbuf.mtext, buf, strlen(buf));
                if(msgsnd(msgid,(void *)&sndbuf, sizeof(sndbuf), 0) < 0 )
                {
                        perror("msgsnd");
                        exit(-1);
                }
        }

        return 0;
}

消费者代码:

/// 从消息队列中读取消息;receiver.c文件;gcc receiver.c -o receiver
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>

#include <ctype.h>

struct msgbuf{
        int mtype;
        char mtext[128];
};

int main(int argc,char *argv[])
{
        int msgid;
        char buf[128] = {0};
        key_t key = 11111;

        struct msgbuf rcvbuf;

        if((msgid = msgget(key, 0666 | IPC_CREAT)) < 0)
        {
                perror("msgget");
                exit(-1);
        }

        while(1)
        {
                memset(&rcvbuf, 0x00, sizeof(rcvbuf));
                if(msgrcv(msgid, (void *)&rcvbuf, sizeof(rcvbuf), 0,  0) < 0)
                {
                        perror("msgrcv");
                        exit(-1);
                }

    // 移除掉fgets函数读取到的换行符。
                if(isspace(rcvbuf.mtext[strlen(rcvbuf.mtext) - 1]))
                {
                        rcvbuf.mtext[strlen(rcvbuf.mtext) - 1] = '\0';
                }

                printf("rcv msg[%s]\n", rcvbuf.mtext);

                if(!strncmp(rcvbuf.mtext, "over", 4))
                {
                        puts("over cycle.");
                        break;
                }
        }
        msgctl(msgid, IPC_RMID, NULL);

        return 0;
}

分别打开两个shell终端,一个纸箱sender进程,另外一个纸箱receiver进程。在sender进程中,用户不断从终端输入消息,而receiver终端则会将所从消息队列中获取到的消息打印到出来,直到读取到字符“over”则从内核中删除该消息队列。

序号 命令 说明
1 IPC_STAT 获取此消息队列的msqid_ds结构,并将其存储在buf所指向的结构中
2 IPC_SET 将参数buf中的一些字段复制到消息队列msqid_ds结构中的相应字段。此命令只能由两个用户执行,一个是其有效用户ID等于msg_perm.cuid和另一个msg_perm.uid。另一个是超级用户root。
3 IPC_RMID 从系统中删除消息队列和消息队列的所有数据。删除立即生效。
4 IPC_INFO(Linux特有) 返回有关buf指向的结构中的系统范围消息队列限制和参数的信息。该结构为msginfo型。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8