公告

Gentoo交流群:87709706 欢迎您的加入

#1 2022-10-11 15:47:29

batsom
管理团队
注册时间: 2022-08-03
帖子: 594
个人网站

linux源码解读(二十四):进程间通信IPC方式对比&poll接受数据

为了确保进程数据的安全,cpu在硬件级别就支持不同进程的内存隔离了,采用的手段分别是:LDT和分页;每个进程都有自己的ldt描述符,严格规定了该进程使用的物理内存!同时还有分页机制,不同进程就算是同样的虚拟地址,也会映射到不同的物理地址!这两项措施严格保证了进程之间的物理内存是严格隔离的,互相无法读写对方的物理内存,进程自身的代码和数据得到了完美的保护,是不是很完美了?呵呵,哪有这么好的事!进程使用的物理内存被严格隔离,同时也堵死了另一条路:进程间通信!举个例子:linux shell命令中用竖线作为管道都用过吧?作用就是把前面命令处理好的结果数据让后面的命令继续处理,本质就是在不同进程之间传输数据,专业名称叫IPC,这个是怎么做到的了?

  1、IPC的通信方式有多种,但最核心的原理或本质都是一样的:在内存中开辟一块空间,A进程网里写数据,B进程从里面读数据,是不是很容易理解了?在此基础上很多研究人员又抽象出了消息队列、共享内存、管道、信号量、信号、socket等进程间的通信方式,互相之间对比如下:
FluxBB bbcode 测试

   尽管原理简单,但是在实现的时候有两点是所有IPC方式都要注意的:

    多进程之间的互斥和同步:共享内存区域就这个,可能同时会有多个生产者和消费者,之间的互斥和同步一定要保证,否则读写的数据结果大概率是错的!可以用信号量、互斥体、自旋锁等实现!
    由于是生产-消费者模型,生产者肯定占据主动的,随时都可以往共享的内存写数据,那么问题来了:消费者怎么知道有新数据来了?-----还记得epoll么?就是用根据fd来建立红黑树得那个,每次只要有事件产生(比如网卡收到数据后中断通知cpu)就根据fd从红黑树中找到epitem加入readlist队列进一步处理,所以3环的app不需要阻塞(用户也可以指定等待超时的时间),只需要轮询就行,异步通信的效率很高!这里的应用场景和socket等待数据是不是类似啊!但是进程间通信的数量和频繁程度肯定不如socket的网络通信,所以没必要耗费那么多空间建立红黑树,直接建立链表后用线性扫描、也就是轮询的方式就好!以管道pipe为例,配合poll的通信demo如下: demo代码并不复杂, 一般人都能看懂!核心就是 res = poll(&pfds,1,timeout); 这行代码设置等待超时!

#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/poll.h>

int main()
{
    int     res;
    char    *buf = "this is data to trans";
    char    readbuf[64] = {0};
    int     fd[2];
    int     pid;
    int     timeout = 6000;                 //超时时间
    struct pollfd pfds;                     //poll结构体

    res = pipe(fd);
    if(res != 0)
    {
        printf("create pipe error\n");
        return 0;
    }

    //设置读管道为poll句柄
    pfds.fd     = fd[0];
    //设置poll的触发事件
    pfds.events = POLLIN | POLLPRI;

    pid = fork();
    if(pid == -1)
    {
        printf("fork fail\n");
    }
    else if(pid == 0) //child
    {
        printf("this is child\n");
        close(fd[1]);
        //规定时间内,监视pdfs这一个通道有无触发事件发生
        res = poll(&pfds,1,timeout);
        if(res == -1)
        {
            printf("poll error\n");
        }
        else if(res == 0)
        {
            printf("time out\n");
        }
        //有POLLIN或者POLLPRI事件,也意味着有数据可读
        else
        {
            read(fd[0],readbuf,64);
            printf("child rev :%s\n",readbuf);
        }

        close(fd[0]);
    }
    else
    {
        printf("this is parent\n");
        usleep(5000000);
        close(fd[0]);
        write(fd[1],buf,64);
        close(fd[1]);
        //等待子进程结束
        waitpid(pid,NULL,0);
    }

    return 0;
}

  2、poll的使用方式很简单,那是因为操作系统在底层做了大量的工作,封装了很多功能(哪有什么开发静好,都是因为操作系统在底层负重前行.....),整个poll在操作系统层面的函数实现和调用链条如下:

FluxBB bbcode 测试
       调用链很长,个人觉得最核心的要从do_poll函数开始了:这个函数有个for死循环,里面挨个遍历链表检查是否有事件到来!

/*轮询队列检查fd是否有事件发生*/
static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
           struct timespec64 *end_time)
{
    poll_table* pt = &wait->pt;
    ktime_t expire, *to = NULL;
    int timed_out = 0, count = 0;
    u64 slack = 0;
    unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
    unsigned long busy_end = 0;

    /* Optimise the no-wait case */
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        pt->_qproc = NULL;
        timed_out = 1;
    }

    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);
    /*这里是个死循环,就是在这里轮询检查是否有时间发生的*/
    for (;;) {
        struct poll_list *walk;
        bool can_busy_loop = false;
        /*遍历poll_list链表,挨个检查有无事件发生*/
        for (walk = list; walk != NULL; walk = walk->next) {
            struct pollfd * pfd, * pfd_end;

            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            for (; pfd != pfd_end; pfd++) {
                /*
                 * Fish for events. If we found one, record it
                 * and kill poll_table->_qproc, so we don't
                 * needlessly register any other waiters after
                 * this. They'll get immediately deregistered
                 * when we break out and return.
                 检查fd的事件是否发生
                 */
                if (do_pollfd(pfd, pt, &can_busy_loop,
                          busy_flag)) {
                    count++;//有事件发生就+1
                    pt->_qproc = NULL;
                    /* found something, stop busy polling */
                    busy_flag = 0;
                    can_busy_loop = false;
                }
            }
        }
        /*
         * All waiters have already been registered, so don't provide
         * a poll_table->_qproc to them on the next loop iteration.
         */
        pt->_qproc = NULL;
        /*如果count=0,说明没有事件发生,当前进程挂起*/
        if (!count) {
            count = wait->error;
            if (signal_pending(current))
                count = -EINTR;
        }
        /*count!=0说明有事件发生了,需要跳出循环;等待超时也跳出循环*/
        if (count || timed_out)
            break;

        /* only if found POLL_BUSY_LOOP sockets && not out of time */
        if (can_busy_loop && !need_resched()) {
            if (!busy_end) {
                busy_end = busy_loop_end_time();
                continue;
            }
            /*未超时继续死循环*/
            if (!busy_loop_timeout(busy_end))
                continue;
        }
        busy_flag = 0;

        /*
         * If this is the first loop and we have a timeout
         * given, then we convert to ktime_t and set the to
         * pointer to the expiry value.
         */
        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }
        /*
        1、设置当前进程状态
        2、slepp让出cpu,直到时间到期
        */
        if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
            timed_out = 1;
    }
    return count;
}

  其中,具体检查事件的函数是do_pollfd,核心是执行file结构体的poll回调函数:

/*
 * Fish for pollable events on the pollfd->fd file descriptor. We're only
 * interested in events matching the pollfd->events mask, and the result
 * matching that mask is both recorded in pollfd->revents and returned. The
 * pwait poll_table will be used by the fd-provided poll handler for waiting,
 * if pwait->_qproc is non-NULL.
   1、根据fd的值完善mask
   2、执行回调函数
 */
static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait,
                     bool *can_busy_poll,
                     unsigned int busy_flag)
{
    unsigned int mask;
    int fd;

    mask = 0;
    fd = pollfd->fd;
    if (fd >= 0) {
        struct fd f = fdget(fd);
        mask = POLLNVAL;
        if (f.file) {
            mask = DEFAULT_POLLMASK;
            if (f.file->f_op->poll) {
                pwait->_key = pollfd->events|POLLERR|POLLHUP;
                pwait->_key |= busy_flag;
                /*事件的回调函数*/
                mask = f.file->f_op->poll(f.file, pwait);
                if (mask & busy_flag)
                    *can_busy_poll = true;
            }
            /* Mask out unneeded events. */
            mask &= pollfd->events | POLLERR | POLLHUP;
            fdput(f);
        }
    }
    pollfd->revents = mask;

    return mask;
}

总结:

  1、这里有个epoll、poll、select的效率对比:可以看到,随着连接数增加,epoll的耗时几乎不变;但是poll和select的耗时呈指数型增长!
FluxBB bbcode 测试

   2、正常情况下,服务器在同一时间可能会和几十万、甚至上百万客户端建立连接!当收到客户端数据时,需要epitem,此时用fd建立红黑树就很适合了! 但类似ipc这种场景,毕竟使用的数量、频率肯定比不上socket,所以没必要额外耗费空间建立红黑树,直接遍历链表即可,效率低不到哪去!

参考:

1、https://xxpcb.gitee.io/2019/09/15/%E8%BF%9B%E7%A8%8B%E9%97%B4%E9%80%9A%E4%BF%A1-IPC/  ipc对比

2、https://cyril3.github.io/2018/01/15/helicopter-view-of-interprocess-communication linux进程通信概览

3、https://blog.csdn.net/spiremoon/article/details/106004076  多进程管道通信以及select、poll函数的应用

4、https://blog.csdn.net/Eunice_fan1207/article/details/99641348  Linux内核剖析-----IO复用函数poll内核源码剖析

离线

页脚

Powered by FluxBB

本站由XREA提供空间支持