找回密码
 立即注册
首页 编程领域 编程板块 通过共享内存和互斥锁、条件变量实现进程同步

PHP 通过共享内存和互斥锁、条件变量实现进程同步

2023-3-15 10:31:43 评论(0)
本帖最后由 羽西 于 2023-3-15 10:44 编辑

目的:
使用 pthread_mutex_t 和 pthread_cond_t 以及共享内存、内存映射 达到跨进程通信的目的

步骤:
  • 通过 shm_open 、ftrancate、mmap将共享内存进行内存映射
  • 将 pthread_mutex_t 和 pthread_cond_t 保存在共享内存中,并且设置 PTHREAD_SHARED 属性,以达到跨进程使用的目的
  • 消费者在没有通知的情况下超时等待10s后输出共享内存数据
  • 生产者产生数据后,可以选择是否进行 notify


结论:
  • 通过设置 PTHREAD_SHARED 属性, pthread_mutex_t 和 pthread_cond_t 可以跨进程使用
  • 如果生产者不进行notify,消费者将等到超时之后才能被唤醒
  • 由于消费者进行wait的时候,并不拥有锁,因此如果在有一个进程/线程进行wait的时候,另外一个进程是可以获取锁并且进行条件唤醒的。此时,wait 的线程/进程将被唤醒,但是在 notify 到 wait 之间的时间间隔中,其他线程/进程也可以获取这个锁。因此,如果我们将 wait 和 nofity 方在独立进程的循环中,此时可以看到 notify 和 wait 并不是一对一交替执行的
  • (补充)mmap 返回的指针,是可以被有亲缘关系的进程使用的
  • (补充)对相同名字的共享内存文件进行内存映射,跨进程得到的内存地址也是相同的

  1. #include
  2. #include
  3. #include
  4. #include
  5. #include
  6. #include "sys.h"
  7. #include

  8. #include         // for shm_open
  9. #include         /* For mode constants */
  10. #include            /* For O_* constants */
  11. #include          // for pthread_xx
  12. #include  // for assert

  13. struct SHM_MUTEX
  14. {
  15.     pthread_mutex_t _mutex;
  16.     pthread_cond_t _cond;
  17. };


  18. class SHM_SYNC_COND
  19. {
  20. public:
  21.     SHM_SYNC_COND() = default;
  22.     bool init(const char *shm_name,size_t elm_size,size_t eml_count)
  23.     {
  24.         assert(_data == nullptr);
  25.         assert(_shm_mutex == nullptr);
  26.         size_t s = sizeof(SHM_MUTEX) + elm_size * eml_count;// 计算内存大小
  27.         int shm_fd = shm_open(shm_name,O_CREAT | O_RDWR,0666);// 创建/打开共享内存文件
  28.         exit_on_error(shm_fd < 0,"shm_open failed!");


  29.         int ret = ftruncate(shm_fd,s); // 截断共享文件大小
  30.         exit_on_error(ret < 0,"ftruncate failed!");


  31.         void *addr = mmap(NULL,s,PROT_WRITE | PROT_READ,MAP_SHARED,shm_fd,0);// 将共享内存文件进行内存映射
  32.         exit_on_error(addr == (void *)-1,"mmap failed");


  33.         _shm_mutex = (SHM_MUTEX *)addr;// 获取共享内存锁
  34.         _data = (char *)addr + sizeof(SHM_MUTEX);


  35.         pthread_mutexattr_t mutexattr;// 设置 mutex 的 PTHREAD_PROCESS_SHARED 属性
  36.         pthread_mutexattr_setpshared(&mutexattr,PTHREAD_PROCESS_SHARED);
  37.         pthread_mutex_init(&_shm_mutex->_mutex,&mutexattr);
  38.         pthread_mutexattr_destroy(&mutexattr);



  39.         pthread_condattr_t condattr;// 设置 cond 的 PTHREAD_PROCESS_SHARED 属性
  40.         pthread_condattr_setpshared(&condattr,PTHREAD_PROCESS_SHARED);
  41.         pthread_cond_init(&_shm_mutex->_cond,&condattr);
  42.         pthread_condattr_destroy(&condattr);
  43.         return true;
  44.     }


  45.     void notify()// 跨进程进行条件变量通知
  46.     {
  47.         assert(_data != nullptr);
  48.         assert(_shm_mutex != nullptr);
  49.         pthread_mutex_lock(&_shm_mutex->_mutex);
  50.         printf("notify..\r\n");
  51.         pthread_cond_broadcast(&_shm_mutex->_cond);
  52.         pthread_mutex_unlock(&_shm_mutex->_mutex);
  53.     }


  54.     void wait(int32_t wait_ms)// 跨进程进行条件变量等待
  55.     {
  56.         assert(_data != nullptr);
  57.         assert(_shm_mutex != nullptr);



  58.         struct timeval now;
  59.         struct timespec abstime;
  60.         gettimeofday(&now,NULL);
  61.         printf("wait sec:%ld,nsec:%ld,wait_ms:%d\r\n",now.tv_sec,now.tv_usec*1000,wait_ms);
  62.         abstime.tv_nsec = (now.tv_usec + wait_ms) * 1000;
  63.         abstime.tv_sec = now.tv_sec + (abstime.tv_nsec) / 1000000000;
  64.         abstime.tv_nsec = abstime.tv_nsec % 1000000000;


  65.         pthread_mutex_lock(&_shm_mutex->_mutex);
  66.         printf("wait sec:%ld,nsec:%ld\r\n",abstime.tv_sec,abstime.tv_nsec);
  67.         //pthread_cond_wait(&_shm_mutex->_cond,&_shm_mutex->_mutex);
  68.         pthread_cond_timedwait(&_shm_mutex->_cond,&_shm_mutex->_mutex,&abstime);
  69.         pthread_mutex_unlock(&_shm_mutex->_mutex);
  70.     }


  71.     void *data_buf() const
  72.     {
  73.         assert(_data != nullptr);
  74.         return _data;
  75.     }


  76. private:
  77.     SHM_MUTEX * _shm_mutex{nullptr};
  78.     void *_data{nullptr};
  79. };


  80. struct IPC_DATA // 跨进程通信时使用的结构体
  81. {
  82.     pid_t _pid{0};
  83.     char  _msg[256];
  84. };



  85. int main()
  86. {
  87.     const char *shm_name = "shm_cond_name"; // 共享内存文件名
  88.     SHM_SYNC_COND shm_cond;
  89.     size_t elm_size = sizeof(IPC_DATA);
  90.     size_t elm_count = 10;
  91.     shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
  92.     IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址


  93.     pid_t pid = fork();// 创建子进程
  94.     if (pid == -1)
  95.     {
  96.         exit_on_error(true,"fork failed");
  97.     }


  98.     if (pid == 0) // 子进程,向 IPC 数据写入数据,然后进行 notify
  99.     {
  100.         usleep(100);
  101.         printf("child to write msg\r\n");
  102.         for (int i = 0; i < elm_count;++i)
  103.         {
  104.             datas[i]._pid = pid;
  105.             snprintf(datas[i]._msg,sizeof(datas[i]._msg),"msg from child,pid:%d,index:%d\r\n",getpid(),i);
  106.         }


  107.         shm_cond.notify();
  108.         printf("child finished!\r\n");
  109.         usleep(10);
  110.         return 0;
  111.     }
  112.     else // 父进程,等待子进程进行条件变量通知,或者等待10s超时,最后输出 IPC_DATA 内容
  113.     {
  114.         int index = 0;
  115.         printf("parent wait for msg\r\n");
  116.         shm_cond.wait(1000*1000*10); // 如果没有生产者进行notify,则需要等待10s超时,此处在10s内就已经被唤醒,说明条件变量跨进程生效
  117.         while (index < elm_count)
  118.         {
  119.             printf("wait for index:%d!msg:%s",index,datas[index++]._msg);
  120.         }


  121.         printf("parent to exit!\r\n");
  122.         usleep(100);
  123.         return 0;
  124.     }
  125.     return 0;
  126. }


  127. // 输出:
  128. // parent wait for msg
  129. // wait sec:1673590541,nsec:138637000,wait_ms:10000000
  130. // wait sec:1673590551,nsec:138637000
  131. // child to write msg
  132. // notify..
  133. // child finished!
  134. // wait for index:1!msg:msg from child,pid:7529,index:0
  135. // wait for index:2!msg:msg from child,pid:7529,index:1
  136. // wait for index:3!msg:msg from child,pid:7529,index:2
  137. // wait for index:4!msg:msg from child,pid:7529,index:3
  138. // wait for index:5!msg:msg from child,pid:7529,index:4
  139. // wait for index:6!msg:msg from child,pid:7529,index:5
  140. // wait for index:7!msg:msg from child,pid:7529,index:6
  141. // wait for index:8!msg:msg from child,pid:7529,index:7
  142. // wait for index:9!msg:msg from child,pid:7529,index:8
  143. // wait for index:10!msg:msg from child,pid:7529,index:9
  144. // parent to exit!
复制代码



本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

使用道具 举报

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们(3513994353@qq.com)。
您需要登录后才可以回帖 登录 | 立即注册
楼主
羽西

关注0

粉丝0

帖子15

最新动态