本帖最后由 羽西 于 2023-3-15 10:44 编辑
目的:
使用 pthread_mutex_t 和 pthread_cond_t 以及共享内存、内存映射 达到跨进程通信的目的
步骤: - 通过 shm_open 、ftrancate、mmap将共享内存进行内存映射
- 将 pthread_mutex_t 和 pthread_cond_t 保存在共享内存中,并且设置 PTHREAD_SHARED 属性,以达到跨进程使用的目的
- 消费者在没有通知的情况下超时等待10s后输出共享内存数据
- 生产者产生数据后,可以选择是否进行 notify
结论: - 通过设置 PTHREAD_SHARED 属性, pthread_mutex_t 和 pthread_cond_t 可以跨进程使用
- 如果生产者不进行notify,消费者将等到超时之后才能被唤醒
- 由于消费者进行wait的时候,并不拥有锁,因此如果在有一个进程/线程进行wait的时候,另外一个进程是可以获取锁并且进行条件唤醒的。此时,wait 的线程/进程将被唤醒,但是在 notify 到 wait 之间的时间间隔中,其他线程/进程也可以获取这个锁。因此,如果我们将 wait 和 nofity 方在独立进程的循环中,此时可以看到 notify 和 wait 并不是一对一交替执行的
- (补充)mmap 返回的指针,是可以被有亲缘关系的进程使用的
- (补充)对相同名字的共享内存文件进行内存映射,跨进程得到的内存地址也是相同的
- #include
- #include
- #include
- #include
- #include
- #include "sys.h"
- #include
- #include // for shm_open
- #include /* For mode constants */
- #include /* For O_* constants */
- #include // for pthread_xx
- #include // for assert
- struct SHM_MUTEX
- {
- pthread_mutex_t _mutex;
- pthread_cond_t _cond;
- };
- class SHM_SYNC_COND
- {
- public:
- SHM_SYNC_COND() = default;
- bool init(const char *shm_name,size_t elm_size,size_t eml_count)
- {
- assert(_data == nullptr);
- assert(_shm_mutex == nullptr);
- size_t s = sizeof(SHM_MUTEX) + elm_size * eml_count;// 计算内存大小
- int shm_fd = shm_open(shm_name,O_CREAT | O_RDWR,0666);// 创建/打开共享内存文件
- exit_on_error(shm_fd < 0,"shm_open failed!");
- int ret = ftruncate(shm_fd,s); // 截断共享文件大小
- exit_on_error(ret < 0,"ftruncate failed!");
- void *addr = mmap(NULL,s,PROT_WRITE | PROT_READ,MAP_SHARED,shm_fd,0);// 将共享内存文件进行内存映射
- exit_on_error(addr == (void *)-1,"mmap failed");
- _shm_mutex = (SHM_MUTEX *)addr;// 获取共享内存锁
- _data = (char *)addr + sizeof(SHM_MUTEX);
- pthread_mutexattr_t mutexattr;// 设置 mutex 的 PTHREAD_PROCESS_SHARED 属性
- pthread_mutexattr_setpshared(&mutexattr,PTHREAD_PROCESS_SHARED);
- pthread_mutex_init(&_shm_mutex->_mutex,&mutexattr);
- pthread_mutexattr_destroy(&mutexattr);
- pthread_condattr_t condattr;// 设置 cond 的 PTHREAD_PROCESS_SHARED 属性
- pthread_condattr_setpshared(&condattr,PTHREAD_PROCESS_SHARED);
- pthread_cond_init(&_shm_mutex->_cond,&condattr);
- pthread_condattr_destroy(&condattr);
- return true;
- }
- void notify()// 跨进程进行条件变量通知
- {
- assert(_data != nullptr);
- assert(_shm_mutex != nullptr);
- pthread_mutex_lock(&_shm_mutex->_mutex);
- printf("notify..\r\n");
- pthread_cond_broadcast(&_shm_mutex->_cond);
- pthread_mutex_unlock(&_shm_mutex->_mutex);
- }
- void wait(int32_t wait_ms)// 跨进程进行条件变量等待
- {
- assert(_data != nullptr);
- assert(_shm_mutex != nullptr);
- struct timeval now;
- struct timespec abstime;
- gettimeofday(&now,NULL);
- printf("wait sec:%ld,nsec:%ld,wait_ms:%d\r\n",now.tv_sec,now.tv_usec*1000,wait_ms);
- abstime.tv_nsec = (now.tv_usec + wait_ms) * 1000;
- abstime.tv_sec = now.tv_sec + (abstime.tv_nsec) / 1000000000;
- abstime.tv_nsec = abstime.tv_nsec % 1000000000;
- pthread_mutex_lock(&_shm_mutex->_mutex);
- printf("wait sec:%ld,nsec:%ld\r\n",abstime.tv_sec,abstime.tv_nsec);
- //pthread_cond_wait(&_shm_mutex->_cond,&_shm_mutex->_mutex);
- pthread_cond_timedwait(&_shm_mutex->_cond,&_shm_mutex->_mutex,&abstime);
- pthread_mutex_unlock(&_shm_mutex->_mutex);
- }
- void *data_buf() const
- {
- assert(_data != nullptr);
- return _data;
- }
- private:
- SHM_MUTEX * _shm_mutex{nullptr};
- void *_data{nullptr};
- };
- struct IPC_DATA // 跨进程通信时使用的结构体
- {
- pid_t _pid{0};
- char _msg[256];
- };
- int main()
- {
- const char *shm_name = "shm_cond_name"; // 共享内存文件名
- SHM_SYNC_COND shm_cond;
- size_t elm_size = sizeof(IPC_DATA);
- size_t elm_count = 10;
- shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
- IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址
- pid_t pid = fork();// 创建子进程
- if (pid == -1)
- {
- exit_on_error(true,"fork failed");
- }
- if (pid == 0) // 子进程,向 IPC 数据写入数据,然后进行 notify
- {
- usleep(100);
- printf("child to write msg\r\n");
- for (int i = 0; i < elm_count;++i)
- {
- datas[i]._pid = pid;
- snprintf(datas[i]._msg,sizeof(datas[i]._msg),"msg from child,pid:%d,index:%d\r\n",getpid(),i);
- }
- shm_cond.notify();
- printf("child finished!\r\n");
- usleep(10);
- return 0;
- }
- else // 父进程,等待子进程进行条件变量通知,或者等待10s超时,最后输出 IPC_DATA 内容
- {
- int index = 0;
- printf("parent wait for msg\r\n");
- shm_cond.wait(1000*1000*10); // 如果没有生产者进行notify,则需要等待10s超时,此处在10s内就已经被唤醒,说明条件变量跨进程生效
- while (index < elm_count)
- {
- printf("wait for index:%d!msg:%s",index,datas[index++]._msg);
- }
- printf("parent to exit!\r\n");
- usleep(100);
- return 0;
- }
- return 0;
- }
- // 输出:
- // parent wait for msg
- // wait sec:1673590541,nsec:138637000,wait_ms:10000000
- // wait sec:1673590551,nsec:138637000
- // child to write msg
- // notify..
- // child finished!
- // wait for index:1!msg:msg from child,pid:7529,index:0
- // wait for index:2!msg:msg from child,pid:7529,index:1
- // wait for index:3!msg:msg from child,pid:7529,index:2
- // wait for index:4!msg:msg from child,pid:7529,index:3
- // wait for index:5!msg:msg from child,pid:7529,index:4
- // wait for index:6!msg:msg from child,pid:7529,index:5
- // wait for index:7!msg:msg from child,pid:7529,index:6
- // wait for index:8!msg:msg from child,pid:7529,index:7
- // wait for index:9!msg:msg from child,pid:7529,index:8
- // wait for index:10!msg:msg from child,pid:7529,index:9
- // parent to exit!
复制代码
|