SCUT
Dozen
shiqing0477@163.com
利用 semaphore 实现 shm 进程通信
一. 进程通信方式简介
总结起来,进程通信包括:
又称软中断,信号包括 SIGCLD、SIGUP、SIGQUIT 等
说明
方式
信号(signal)
信号量+共享存储区 在内存开辟共享存储块(share memory),由信号量(semaphore)来管理
管道
消息传递
套接字
其中,信号量+共享存储区、消息传递,称作 XSI IPC(InterProcess Communication)。
实即 I/O 操作,在硬盘读写文件
利用消息队列,发送到对应的信箱
Socket,网络通信
备注
最快
二. 信号量+共享存储区的进程间通信
2.1 共享存储问题分析
本文介绍的进程通信方法,是在内存中开辟一块共享内存区,再利用信号量实现访问的互斥、阻塞、初始化判
别。共享存储的原理如图 1 所示:
图 1 共享存储机制的物理、逻辑示意图
假设进程 A、B 需要相互通信,则各自设一个结构体来存储共享存储(以下简称 shm)的描述信息。其中,
shm_A.shmkey=shm_A.shmkey,shm_A.shmid=shm_A.shmid,但是,shm_A.shmdata_pointer 与 shm_B.shmdata_pointer
是不想等的,道理很简单:两者分别在不同的进程,它们各自描述自己所在进程的 shm 的虚拟地址,然后通过内核
映射到内存中 shm 的物理地址(SharedAddr)。
A、B(或者更多的进程,本文只介绍两个进程)通信,自然引发出如何互斥、如何同步的问题。A、B 不能同
时访问 SharedAddr,解决互斥问题,即可实现异步通信,即:接收方需要数据时,就访问 SharedAddr;发送方需要
更新数据时,就直接往 SharedAddr 写数据。另外一个需求就是,接收方希望即时获取新的数据,即:发送方一更新
数据,接收方立刻获取到新的数据,这样就要求接收端可以休眠(或阻塞)等待新数据。以上两个问题可以利用信
号量(以下简称 sem)来解决:使用两个信号量,一个用于互斥、一个用于唤醒通知。
引进信号量以后,又有一个新的问题:多个进程使用同一个 key 的 sem,而一个 sem 的创建其实只需要一次初
始化,引用 sem 时如何知道这个 sem 是否已经经过初始化了呢?我们知道,sem 是信号量集,还可以再加一个信号
1 / 8
SCUT
Dozen
shiqing0477@163.com
量,利用此信号量标志 sem 已经初始化了。
综上所述,基于 shm 通信,可利用 3 个 semaphore,以下简称为:SEM_MUTEXT、SEM_BLOCKING、SEM_INITIAL。
在内存开辟共享内存块,将该内存块地址映射各进程中,进程通过某种机制访问映射地址,即可读、写共享内存块。
2.2 PV 操作过程
有了 SEM_MUTEXT、SEM_BLOCKING、SEM_INITIAL,进程进行 PV 操作可通过以下步骤实现(假设 shm 的地址
是 shmdata_pointer):
(1)对于 P 操作:
Step1:对 SEM_MUTEXT 做 semop 操作,动作为“-1”,以锁住资源,此时其他进程必须等待资源释放才能访问;
Step2:将要写进的数据的长度 datasize 写到地址 shmdata_pointer;
Step3:将有效数据写到 shmdata_pointer+sizof(unsigned short);
Step4:对 SEM_MUTEXT 做 semop 操作,动作为“+1”,以释放资源,此时其他进程可以访问资源;
Step5:对 SEM_BLOCKING 注意 semop 操作,动作为“-1”;之后立刻再做 semop 操作,动作为“+1”。这个步骤只
对 V 操作阻塞等待模式才有效。
(2)对于 V 操作:
Step1:判断是否阻塞模式,是则对 SEM_BLOCKING 做 semop 操作,动作为“0”,如此等待 P 操作中对 SEM_BLOCKING
的“-1”动作,当“-1”动作发生,V 进程立刻被唤醒,程序立刻执行下去;而 P 进程立刻“+1”动作,相
当于“开门”放行一个人之后立刻“关门”。(注意:此处科学性还不够,只能由一个 V 进程等待,不能确
保多个 V 进程都能读到刚更新的数据,不过该思想是能够拓展到多个 V 进程的模式的。)
Step2:对 SEM_MUTEXT 做 semop 操作,动作为“-1”,以锁住资源,此时其他进程必须等待资源释放才能访问;
Step3:读取地址 shmdata_pointer 为首地址,长度为 sizeof(unsigned short)的内容,赋给 datasize;
Step4:读取地址 shmdata_pointer+sizeof(unsigned short)为首地址,长度为 datasize 的内容,即有效数据;
Step5:对 SEM_MUTEXT 做 semop 操作,动作为“+1”,以释放资源,此时其他进程可以访问资源。
对于信号量初始化的过程,难点在于判断信号量是否已经被初始化过,其过程如下:
Step1:获取 sem 标识符:semid = semget(key, 3, IPC_CREAT)。
Step2:判断 SEM_INITIAL 是否已经被标志:semctl(semid, SEM_INITIAL, GETVAL)。其返回值若等于 SEM_INITIAL_VAL,
则说明已经初始化过,否则进行下一步。
Step3:sem 初始化,向 SEM_MUTEXT、SEM_BLOCKING、SEM_INITIAL 分别 SETVAL 为:1,1,SEM_INITIAL_VAL。
三. 代码实现
3.1 _shm_.h 和_shm_.c
(1)_shm_.h:
/*
* FileName : _shm_.h
* Usage :
* Date : Mar 7, 2013
* Version : V1.0
* Author : lidesheng
*/
#ifndef SHM__H_
#define SHM__H_
#include
#include
#include
2 / 8
SCUT
Dozen
shiqing0477@163.com
#include
#include
#include
#include
#include
#define SHM_SIZE (1024*1024)
#define SHM_MODE (IPC_CREAT|0666)
#define SEM_MODE (IPC_CREAT|0666)
#define SEM_N_DEFAULT 3
#define SEM_MUTEX 0
#define SEM_BLOCK 1
#define SEM_INITIAL 2
#define SEM_VAL_INIT 5
#define SEM_LOCK -1
#define SEM_RELEASE 1
#define SEM_WAIT 0
#define IPC_DELETED (0-EIDRM)
#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/* union semun is defined by including */
#else
/* according to X/OPEN we have to define it ourselves */
union semun{
};
#endif
//store the public variables for the functions
struct _shm_{
int shmid;
void *shmdata;
size_t currentdatasize;
int semid;
int nsems; //counts of semaphore
} ;
typedef struct _shm_ shm_def;
int shm_init(shm_def *shm,size_t shmsize,key_t shmkey,key_t semkey);
int shm_recv(shm_def *shm, void *dest,const int blocking);
int shm_send(shm_def *shm, const void *data, const size_t datasize);
int shm_delete(const int shmid, const int semid);
int shm_sem_operation(const int semid,const int semnum,const int semopflag);
int shm_sem_operation_f(const int semid,const int semnum,const int semopflag,const int semflg);
int shm_sem_delete(const int semid);
#endif /* SHM__H_ */
int val; //for SETVAL
struct semid_ds *buf; //for IPC_STAT/IPC_SET
unsigned short *array;//for GETALL/SETALL
3 / 8
SCUT
(2)_shm_.c
Dozen
shiqing0477@163.com
perror("create shared memory failed");
exit(-1);
/*
* FileName : _shm_.c
* Usage :
* Date : Mar 7, 2013
* Version : V1.0
* Author : lidesheng
*/
#include "_shm_.h"
/*
* function :init_shm(shm_def *shm,size_t shmsize,key_t shmkey,key_t semkey)
* usage :initial the shm,use two semaphores,sem0 for lock-release,sem1 for blocking,sem2 for initial
* return :int; success-0,erro-<0
* parameter:pointer to a shm_def type
*/
int shm_init(shm_def *shm,size_t shmsize,key_t shmkey,key_t semkey)
{
int nsems=shm->nsems+SEM_N_DEFAULT;
union semun semsu;
//get the memory,if not existed,then create it.
if((shm->shmid = shmget(shmkey,shmsize,SHM_MODE)) < 0) {
}
//get the address of the shared memory.
if((shm->shmdata = shmat(shm->shmid,0,0)) == (void *)-1) {
}
printf("shmid=%d,shmdata=%lx\n",shm->shmid,(unsigned long)shm->shmdata);
//get the id of the semaphore
if((shm->semid = semget(semkey,nsems,SEM_MODE)) < 0) {
}
//check if the semaphore has been initialed,if not then initial it.
if((semctl(shm->semid,SEM_INITIAL,GETVAL))!=SEM_VAL_INIT) {
}
printf("muxtex_semval=%d,mutext_ncnt=%d;lock_semval=%d,lock_ncnt=%d.\n",
//initial the semaphore
printf("Initial the semaphore now.\n");
semsu.array=malloc(sizeof(unsigned short)*2);
semsu.array[SEM_MUTEX]=1;
semsu.array[SEM_BLOCK]=1;
semsu.array[SEM_INITIAL]=SEM_VAL_INIT;
if(semctl(shm->semid,0,SETALL,semsu) < 0) {
}
free(semsu.array);
perror("semaphore initial err");
exit(-4);
perror("get semaphore err");
exit(-3);
perror("shmat error");
exit(-2);
semctl(shm->semid,SEM_MUTEX,GETVAL),
4 / 8
SCUT
Dozen
shiqing0477@163.com
return 0;
perror("sem_operation failed");
return -1;
semctl(shm->semid,SEM_MUTEX,GETNCNT),
semctl(shm->semid,SEM_BLOCK,GETVAL),
semctl(shm->semid,SEM_BLOCK,GETNCNT));
struct sembuf sb;
sb.sem_num = semnum; //semaphore index
sb.sem_op = semopflag; //semaphore operation
sb.sem_flg = SEM_UNDO; //set undo,so that the case of exception exit...
if(semop(semid,&sb,1) < 0) {
}
return 0;
}
/*
* function :shm_sem_operation(const int semid,const int semnum,const int semop)
* usage :operation the semaphore
* return :int; success-0,erro-<0
* parameter:semid,semnum,semop:-1,0,1
*/
int shm_sem_operation(const int semid,const int semnum,const int semopflag)
{
}
int shm_sem_operation_f(const int semid,const int semnum,const int semopflag,const int semflg)
{
}
int shm_sem_delete(const int semid)
{
}
int shm_recv(shm_def *shm, void *dest,const int blocking)
{
unsigned long *psize=malloc(sizeof(unsigned short));
if(blocking==1)
{
struct sembuf sb;
sb.sem_num = semnum; //semaphore index
sb.sem_op = semopflag; //semaphore operation
sb.sem_flg = semflg; //set undo,so that the case of exception exit...
if(semop(semid,&sb,1) < 0) {
}
return 0;
perror("sem_operation failed");
return -1;
perror("delete semaphore err");
return -1;
if(semctl(semid,0,IPC_RMID) < 0){
}
return 0;
if(shm_sem_operation(shm->semid,SEM_BLOCK,SEM_WAIT) <0) {
5 / 8
SCUT
Dozen
shiqing0477@163.com
perror("operation err,lock failed");
return -1;
perror("operation err,wait failed");
return -1;
}
perror("operation err,release failed");
return -2;
}
if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_LOCK) <0) {
}
memcpy(psize,shm->shmdata,sizeof(unsigned short));
memcpy(dest,shm->shmdata+sizeof(unsigned short),*psize);
if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_RELEASE) <0) {
}
shm->currentdatasize=*psize;
free(psize);
return shm->currentdatasize;
}
int shm_send(shm_def *shm, const void *data, const size_t datasize)
{
}
int shm_delete(const int shmid,const int semid)
{
}
if(semctl(semid,0,IPC_RMID) < 0){
}
if(shmctl(shmid,IPC_RMID,0) < 0){
}
return 0;
perror("operation err,lock failed");
return -1;
shm->currentdatasize=datasize;
if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_LOCK) <0) {
}
memcpy(shm->shmdata,&datasize,sizeof(unsigned short));
memcpy(shm->shmdata+sizeof(unsigned short),data,datasize);
if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_RELEASE) <0) {
}
shm_sem_operation(shm->semid,SEM_BLOCK,SEM_LOCK);
shm_sem_operation(shm->semid,SEM_BLOCK,SEM_RELEASE);
return shm->currentdatasize;
perror("operation err,release failed");
return -2;
perror("delete semaphore err");
perror("delete shm err");
return -1;
3.2 PV
(1)test_shm_P.c
/*
* FileName : test_shm_P.c
6 / 8
SCUT
Dozen
shiqing0477@163.com
int a;
int b;
char c[20];
* Usage :
* Date : Mar 7, 2013
* Version : V1.0
* Author : lidesheng
*/
#include
#include "_shm_.h"
struct data{
}mdat;
int main(void)
{
}
(2)test_shm_V.c
shm_def m_shm={-1,0,0,-1,0};
char sbuff[30];
//char recv[30]={0};
int i=0;
i=shm_init(&m_shm,SHM_SIZE,1606,2606);
while(i++<20) {
}
shm_delete(m_shm.shmid,m_shm.semid);
return 0;
sprintf(sbuff,"Hello,V%d.",i);
printf("send:%d\n",i);
shm_send(&m_shm,(void *)sbuff,(i<10)?9:10);
sleep(2);
/*
* FileName : test_shm_P.c
* Usage :
* Date : Mar 7, 2013
* Version : V1.0
* Author : lidesheng
*/
#include
#include "_shm_.h"
struct data{
}mdat;
int main(void)
{
int a;
int b;
char c[20];
shm_def m_shm={-1,0,0,-1,0};
char recv[30]={0};
int i=0;
7 / 8
SCUT
Dozen
shiqing0477@163.com
break;
//m_shm.func_init_shm=init_shm;
shm_init(&m_shm,SHM_SIZE,1606,2606);
shm_send(&m_shm,1,(void *)"hello,this is main.\0",20);
while(1)
{
}
return 0;
if(shm_recv(&m_shm, (void *)recv, 1) < 0)
{
}
printf("recv:%s.i=%d,len=%d\n",(char *)recv,i,m_shm.currentdatasize);
/*if((i++)==10) {
}*/
//sleep(2);
printf("sleep.....");
sleep(10);
i=0;
//
}
3.3 makefile
gcc $< test_shm_P.c -o P
gcc $< test_shm_V.c -o V
P V:_shm_.o test_shm_P.c test_shm_V.c
_shm_.o:_shm_.c _shm_.h
gcc -c _shm_.c
四. 问题分析
(1) semop 操作时,sem_flg=UNDO,做了啥?
设置了 UNDO,无论进程对 sem 做的 semop 操作是什么,sem_op 的值都会被“积累”到对应的信号量调整值
上,当进程退出时,内核就遍历进程的调整值,以作处理,确保进程退出后,就像没使用过该 sem 一样,如此,可
确保即便进程异常退出,内核也能够释放该进程所锁住的资源。
(2) 为何阻塞读取时,sem_op=0?
sem_op=0,SEM_BLOCKING 信号量初始值为“1”,那么 V 进程进行 semop 操作时,须等待 SEM_BLOCKING 的
semval 变成 0,这样就实现了阻塞等待。
8 / 8