logo资料库

利用semaphore实现shm进程通信.pdf

第1页 / 共8页
第2页 / 共8页
第3页 / 共8页
第4页 / 共8页
第5页 / 共8页
第6页 / 共8页
第7页 / 共8页
第8页 / 共8页
资料共8页,全文预览结束
SCUT Dozen shiqing0477@163.com 利用 semaphore 实现 shm 进程通信 一. 进程通信方式简介 总结起来,进程通信包括: 又称软中断,信号包括 SIGCLD、SIGUP、SIGQUIT 等 说明 方式 信号(signal) 信号量+共享存储区 在内存开辟共享存储块(share memory),由信号量(semaphore)来管理 管道 消息传递 套接字 其中,信号量+共享存储区、消息传递,称作 XSI IPC(InterProcess Communication)。 实即 I/O 操作,在硬盘读写文件 利用消息队列,发送到对应的信箱 Socket,网络通信 备注 最快 二. 信号量+共享存储区的进程间通信 2.1 共享存储问题分析 本文介绍的进程通信方法,是在内存中开辟一块共享内存区,再利用信号量实现访问的互斥、阻塞、初始化判 别。共享存储的原理如图 1 所示: 图 1 共享存储机制的物理、逻辑示意图 假设进程 A、B 需要相互通信,则各自设一个结构体来存储共享存储(以下简称 shm)的描述信息。其中, shm_A.shmkey=shm_A.shmkey,shm_A.shmid=shm_A.shmid,但是,shm_A.shmdata_pointer 与 shm_B.shmdata_pointer 是不想等的,道理很简单:两者分别在不同的进程,它们各自描述自己所在进程的 shm 的虚拟地址,然后通过内核 映射到内存中 shm 的物理地址(SharedAddr)。 A、B(或者更多的进程,本文只介绍两个进程)通信,自然引发出如何互斥、如何同步的问题。A、B 不能同 时访问 SharedAddr,解决互斥问题,即可实现异步通信,即:接收方需要数据时,就访问 SharedAddr;发送方需要 更新数据时,就直接往 SharedAddr 写数据。另外一个需求就是,接收方希望即时获取新的数据,即:发送方一更新 数据,接收方立刻获取到新的数据,这样就要求接收端可以休眠(或阻塞)等待新数据。以上两个问题可以利用信 号量(以下简称 sem)来解决:使用两个信号量,一个用于互斥、一个用于唤醒通知。 引进信号量以后,又有一个新的问题:多个进程使用同一个 key 的 sem,而一个 sem 的创建其实只需要一次初 始化,引用 sem 时如何知道这个 sem 是否已经经过初始化了呢?我们知道,sem 是信号量集,还可以再加一个信号 1 / 8
SCUT Dozen shiqing0477@163.com 量,利用此信号量标志 sem 已经初始化了。 综上所述,基于 shm 通信,可利用 3 个 semaphore,以下简称为:SEM_MUTEXT、SEM_BLOCKING、SEM_INITIAL。 在内存开辟共享内存块,将该内存块地址映射各进程中,进程通过某种机制访问映射地址,即可读、写共享内存块。 2.2 PV 操作过程 有了 SEM_MUTEXT、SEM_BLOCKING、SEM_INITIAL,进程进行 PV 操作可通过以下步骤实现(假设 shm 的地址 是 shmdata_pointer): (1)对于 P 操作: Step1:对 SEM_MUTEXT 做 semop 操作,动作为“-1”,以锁住资源,此时其他进程必须等待资源释放才能访问; Step2:将要写进的数据的长度 datasize 写到地址 shmdata_pointer; Step3:将有效数据写到 shmdata_pointer+sizof(unsigned short); Step4:对 SEM_MUTEXT 做 semop 操作,动作为“+1”,以释放资源,此时其他进程可以访问资源; Step5:对 SEM_BLOCKING 注意 semop 操作,动作为“-1”;之后立刻再做 semop 操作,动作为“+1”。这个步骤只 对 V 操作阻塞等待模式才有效。 (2)对于 V 操作: Step1:判断是否阻塞模式,是则对 SEM_BLOCKING 做 semop 操作,动作为“0”,如此等待 P 操作中对 SEM_BLOCKING 的“-1”动作,当“-1”动作发生,V 进程立刻被唤醒,程序立刻执行下去;而 P 进程立刻“+1”动作,相 当于“开门”放行一个人之后立刻“关门”。(注意:此处科学性还不够,只能由一个 V 进程等待,不能确 保多个 V 进程都能读到刚更新的数据,不过该思想是能够拓展到多个 V 进程的模式的。) Step2:对 SEM_MUTEXT 做 semop 操作,动作为“-1”,以锁住资源,此时其他进程必须等待资源释放才能访问; Step3:读取地址 shmdata_pointer 为首地址,长度为 sizeof(unsigned short)的内容,赋给 datasize; Step4:读取地址 shmdata_pointer+sizeof(unsigned short)为首地址,长度为 datasize 的内容,即有效数据; Step5:对 SEM_MUTEXT 做 semop 操作,动作为“+1”,以释放资源,此时其他进程可以访问资源。 对于信号量初始化的过程,难点在于判断信号量是否已经被初始化过,其过程如下: Step1:获取 sem 标识符:semid = semget(key, 3, IPC_CREAT)。 Step2:判断 SEM_INITIAL 是否已经被标志:semctl(semid, SEM_INITIAL, GETVAL)。其返回值若等于 SEM_INITIAL_VAL, 则说明已经初始化过,否则进行下一步。 Step3:sem 初始化,向 SEM_MUTEXT、SEM_BLOCKING、SEM_INITIAL 分别 SETVAL 为:1,1,SEM_INITIAL_VAL。 三. 代码实现 3.1 _shm_.h 和_shm_.c (1)_shm_.h: /* * FileName : _shm_.h * Usage : * Date : Mar 7, 2013 * Version : V1.0 * Author : lidesheng */ #ifndef SHM__H_ #define SHM__H_ #include #include #include 2 / 8
SCUT Dozen shiqing0477@163.com #include #include #include #include #include #define SHM_SIZE (1024*1024) #define SHM_MODE (IPC_CREAT|0666) #define SEM_MODE (IPC_CREAT|0666) #define SEM_N_DEFAULT 3 #define SEM_MUTEX 0 #define SEM_BLOCK 1 #define SEM_INITIAL 2 #define SEM_VAL_INIT 5 #define SEM_LOCK -1 #define SEM_RELEASE 1 #define SEM_WAIT 0 #define IPC_DELETED (0-EIDRM) #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED) /* union semun is defined by including */ #else /* according to X/OPEN we have to define it ourselves */ union semun{ }; #endif //store the public variables for the functions struct _shm_{ int shmid; void *shmdata; size_t currentdatasize; int semid; int nsems; //counts of semaphore } ; typedef struct _shm_ shm_def; int shm_init(shm_def *shm,size_t shmsize,key_t shmkey,key_t semkey); int shm_recv(shm_def *shm, void *dest,const int blocking); int shm_send(shm_def *shm, const void *data, const size_t datasize); int shm_delete(const int shmid, const int semid); int shm_sem_operation(const int semid,const int semnum,const int semopflag); int shm_sem_operation_f(const int semid,const int semnum,const int semopflag,const int semflg); int shm_sem_delete(const int semid); #endif /* SHM__H_ */ int val; //for SETVAL struct semid_ds *buf; //for IPC_STAT/IPC_SET unsigned short *array;//for GETALL/SETALL 3 / 8
SCUT (2)_shm_.c Dozen shiqing0477@163.com perror("create shared memory failed"); exit(-1); /* * FileName : _shm_.c * Usage : * Date : Mar 7, 2013 * Version : V1.0 * Author : lidesheng */ #include "_shm_.h" /* * function :init_shm(shm_def *shm,size_t shmsize,key_t shmkey,key_t semkey) * usage :initial the shm,use two semaphores,sem0 for lock-release,sem1 for blocking,sem2 for initial * return :int; success-0,erro-<0 * parameter:pointer to a shm_def type */ int shm_init(shm_def *shm,size_t shmsize,key_t shmkey,key_t semkey) { int nsems=shm->nsems+SEM_N_DEFAULT; union semun semsu; //get the memory,if not existed,then create it. if((shm->shmid = shmget(shmkey,shmsize,SHM_MODE)) < 0) { } //get the address of the shared memory. if((shm->shmdata = shmat(shm->shmid,0,0)) == (void *)-1) { } printf("shmid=%d,shmdata=%lx\n",shm->shmid,(unsigned long)shm->shmdata); //get the id of the semaphore if((shm->semid = semget(semkey,nsems,SEM_MODE)) < 0) { } //check if the semaphore has been initialed,if not then initial it. if((semctl(shm->semid,SEM_INITIAL,GETVAL))!=SEM_VAL_INIT) { } printf("muxtex_semval=%d,mutext_ncnt=%d;lock_semval=%d,lock_ncnt=%d.\n", //initial the semaphore printf("Initial the semaphore now.\n"); semsu.array=malloc(sizeof(unsigned short)*2); semsu.array[SEM_MUTEX]=1; semsu.array[SEM_BLOCK]=1; semsu.array[SEM_INITIAL]=SEM_VAL_INIT; if(semctl(shm->semid,0,SETALL,semsu) < 0) { } free(semsu.array); perror("semaphore initial err"); exit(-4); perror("get semaphore err"); exit(-3); perror("shmat error"); exit(-2); semctl(shm->semid,SEM_MUTEX,GETVAL), 4 / 8
SCUT Dozen shiqing0477@163.com return 0; perror("sem_operation failed"); return -1; semctl(shm->semid,SEM_MUTEX,GETNCNT), semctl(shm->semid,SEM_BLOCK,GETVAL), semctl(shm->semid,SEM_BLOCK,GETNCNT)); struct sembuf sb; sb.sem_num = semnum; //semaphore index sb.sem_op = semopflag; //semaphore operation sb.sem_flg = SEM_UNDO; //set undo,so that the case of exception exit... if(semop(semid,&sb,1) < 0) { } return 0; } /* * function :shm_sem_operation(const int semid,const int semnum,const int semop) * usage :operation the semaphore * return :int; success-0,erro-<0 * parameter:semid,semnum,semop:-1,0,1 */ int shm_sem_operation(const int semid,const int semnum,const int semopflag) { } int shm_sem_operation_f(const int semid,const int semnum,const int semopflag,const int semflg) { } int shm_sem_delete(const int semid) { } int shm_recv(shm_def *shm, void *dest,const int blocking) { unsigned long *psize=malloc(sizeof(unsigned short)); if(blocking==1) { struct sembuf sb; sb.sem_num = semnum; //semaphore index sb.sem_op = semopflag; //semaphore operation sb.sem_flg = semflg; //set undo,so that the case of exception exit... if(semop(semid,&sb,1) < 0) { } return 0; perror("sem_operation failed"); return -1; perror("delete semaphore err"); return -1; if(semctl(semid,0,IPC_RMID) < 0){ } return 0; if(shm_sem_operation(shm->semid,SEM_BLOCK,SEM_WAIT) <0) { 5 / 8
SCUT Dozen shiqing0477@163.com perror("operation err,lock failed"); return -1; perror("operation err,wait failed"); return -1; } perror("operation err,release failed"); return -2; } if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_LOCK) <0) { } memcpy(psize,shm->shmdata,sizeof(unsigned short)); memcpy(dest,shm->shmdata+sizeof(unsigned short),*psize); if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_RELEASE) <0) { } shm->currentdatasize=*psize; free(psize); return shm->currentdatasize; } int shm_send(shm_def *shm, const void *data, const size_t datasize) { } int shm_delete(const int shmid,const int semid) { } if(semctl(semid,0,IPC_RMID) < 0){ } if(shmctl(shmid,IPC_RMID,0) < 0){ } return 0; perror("operation err,lock failed"); return -1; shm->currentdatasize=datasize; if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_LOCK) <0) { } memcpy(shm->shmdata,&datasize,sizeof(unsigned short)); memcpy(shm->shmdata+sizeof(unsigned short),data,datasize); if(shm_sem_operation(shm->semid,SEM_MUTEX,SEM_RELEASE) <0) { } shm_sem_operation(shm->semid,SEM_BLOCK,SEM_LOCK); shm_sem_operation(shm->semid,SEM_BLOCK,SEM_RELEASE); return shm->currentdatasize; perror("operation err,release failed"); return -2; perror("delete semaphore err"); perror("delete shm err"); return -1; 3.2 PV (1)test_shm_P.c /* * FileName : test_shm_P.c 6 / 8
SCUT Dozen shiqing0477@163.com int a; int b; char c[20]; * Usage : * Date : Mar 7, 2013 * Version : V1.0 * Author : lidesheng */ #include #include "_shm_.h" struct data{ }mdat; int main(void) { } (2)test_shm_V.c shm_def m_shm={-1,0,0,-1,0}; char sbuff[30]; //char recv[30]={0}; int i=0; i=shm_init(&m_shm,SHM_SIZE,1606,2606); while(i++<20) { } shm_delete(m_shm.shmid,m_shm.semid); return 0; sprintf(sbuff,"Hello,V%d.",i); printf("send:%d\n",i); shm_send(&m_shm,(void *)sbuff,(i<10)?9:10); sleep(2); /* * FileName : test_shm_P.c * Usage : * Date : Mar 7, 2013 * Version : V1.0 * Author : lidesheng */ #include #include "_shm_.h" struct data{ }mdat; int main(void) { int a; int b; char c[20]; shm_def m_shm={-1,0,0,-1,0}; char recv[30]={0}; int i=0; 7 / 8
SCUT Dozen shiqing0477@163.com break; //m_shm.func_init_shm=init_shm; shm_init(&m_shm,SHM_SIZE,1606,2606); shm_send(&m_shm,1,(void *)"hello,this is main.\0",20); while(1) { } return 0; if(shm_recv(&m_shm, (void *)recv, 1) < 0) { } printf("recv:%s.i=%d,len=%d\n",(char *)recv,i,m_shm.currentdatasize); /*if((i++)==10) { }*/ //sleep(2); printf("sleep....."); sleep(10); i=0; // } 3.3 makefile gcc $< test_shm_P.c -o P gcc $< test_shm_V.c -o V P V:_shm_.o test_shm_P.c test_shm_V.c _shm_.o:_shm_.c _shm_.h gcc -c _shm_.c 四. 问题分析 (1) semop 操作时,sem_flg=UNDO,做了啥? 设置了 UNDO,无论进程对 sem 做的 semop 操作是什么,sem_op 的值都会被“积累”到对应的信号量调整值 上,当进程退出时,内核就遍历进程的调整值,以作处理,确保进程退出后,就像没使用过该 sem 一样,如此,可 确保即便进程异常退出,内核也能够释放该进程所锁住的资源。 (2) 为何阻塞读取时,sem_op=0? sem_op=0,SEM_BLOCKING 信号量初始值为“1”,那么 V 进程进行 semop 操作时,须等待 SEM_BLOCKING 的 semval 变成 0,这样就实现了阻塞等待。 8 / 8
分享到:
收藏