页次: 1
为了确保进程数据的安全,cpu在硬件级别就支持不同进程的内存隔离了,采用的手段分别是:LDT和分页;每个进程都有自己的ldt描述符,严格规定了该进程使用的物理内存!同时还有分页机制,不同进程就算是同样的虚拟地址,也会映射到不同的物理地址!这两项措施严格保证了进程之间的物理内存是严格隔离的,互相无法读写对方的物理内存,进程自身的代码和数据得到了完美的保护,是不是很完美了?呵呵,哪有这么好的事!进程使用的物理内存被严格隔离,同时也堵死了另一条路:进程间通信!举个例子:linux shell命令中用竖线作为管道都用过吧?作用就是把前面命令处理好的结果数据让后面的命令继续处理,本质就是在不同进程之间传输数据,专业名称叫IPC,这个是怎么做到的了?
1、IPC的通信方式有多种,但最核心的原理或本质都是一样的:在内存中开辟一块空间,A进程网里写数据,B进程从里面读数据,是不是很容易理解了?在此基础上很多研究人员又抽象出了消息队列、共享内存、管道、信号量、信号、socket等进程间的通信方式,互相之间对比如下:
尽管原理简单,但是在实现的时候有两点是所有IPC方式都要注意的:
多进程之间的互斥和同步:共享内存区域就这个,可能同时会有多个生产者和消费者,之间的互斥和同步一定要保证,否则读写的数据结果大概率是错的!可以用信号量、互斥体、自旋锁等实现!
由于是生产-消费者模型,生产者肯定占据主动的,随时都可以往共享的内存写数据,那么问题来了:消费者怎么知道有新数据来了?-----还记得epoll么?就是用根据fd来建立红黑树得那个,每次只要有事件产生(比如网卡收到数据后中断通知cpu)就根据fd从红黑树中找到epitem加入readlist队列进一步处理,所以3环的app不需要阻塞(用户也可以指定等待超时的时间),只需要轮询就行,异步通信的效率很高!这里的应用场景和socket等待数据是不是类似啊!但是进程间通信的数量和频繁程度肯定不如socket的网络通信,所以没必要耗费那么多空间建立红黑树,直接建立链表后用线性扫描、也就是轮询的方式就好!以管道pipe为例,配合poll的通信demo如下: demo代码并不复杂, 一般人都能看懂!核心就是 res = poll(&pfds,1,timeout); 这行代码设置等待超时!
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/poll.h>
int main()
{
int res;
char *buf = "this is data to trans";
char readbuf[64] = {0};
int fd[2];
int pid;
int timeout = 6000; //超时时间
struct pollfd pfds; //poll结构体
res = pipe(fd);
if(res != 0)
{
printf("create pipe error\n");
return 0;
}
//设置读管道为poll句柄
pfds.fd = fd[0];
//设置poll的触发事件
pfds.events = POLLIN | POLLPRI;
pid = fork();
if(pid == -1)
{
printf("fork fail\n");
}
else if(pid == 0) //child
{
printf("this is child\n");
close(fd[1]);
//规定时间内,监视pdfs这一个通道有无触发事件发生
res = poll(&pfds,1,timeout);
if(res == -1)
{
printf("poll error\n");
}
else if(res == 0)
{
printf("time out\n");
}
//有POLLIN或者POLLPRI事件,也意味着有数据可读
else
{
read(fd[0],readbuf,64);
printf("child rev :%s\n",readbuf);
}
close(fd[0]);
}
else
{
printf("this is parent\n");
usleep(5000000);
close(fd[0]);
write(fd[1],buf,64);
close(fd[1]);
//等待子进程结束
waitpid(pid,NULL,0);
}
return 0;
}
2、poll的使用方式很简单,那是因为操作系统在底层做了大量的工作,封装了很多功能(哪有什么开发静好,都是因为操作系统在底层负重前行.....),整个poll在操作系统层面的函数实现和调用链条如下:
调用链很长,个人觉得最核心的要从do_poll函数开始了:这个函数有个for死循环,里面挨个遍历链表检查是否有事件到来!
/*轮询队列检查fd是否有事件发生*/
static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
struct timespec64 *end_time)
{
poll_table* pt = &wait->pt;
ktime_t expire, *to = NULL;
int timed_out = 0, count = 0;
u64 slack = 0;
unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_end = 0;
/* Optimise the no-wait case */
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
pt->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
/*这里是个死循环,就是在这里轮询检查是否有时间发生的*/
for (;;) {
struct poll_list *walk;
bool can_busy_loop = false;
/*遍历poll_list链表,挨个检查有无事件发生*/
for (walk = list; walk != NULL; walk = walk->next) {
struct pollfd * pfd, * pfd_end;
pfd = walk->entries;
pfd_end = pfd + walk->len;
for (; pfd != pfd_end; pfd++) {
/*
* Fish for events. If we found one, record it
* and kill poll_table->_qproc, so we don't
* needlessly register any other waiters after
* this. They'll get immediately deregistered
* when we break out and return.
检查fd的事件是否发生
*/
if (do_pollfd(pfd, pt, &can_busy_loop,
busy_flag)) {
count++;//有事件发生就+1
pt->_qproc = NULL;
/* found something, stop busy polling */
busy_flag = 0;
can_busy_loop = false;
}
}
}
/*
* All waiters have already been registered, so don't provide
* a poll_table->_qproc to them on the next loop iteration.
*/
pt->_qproc = NULL;
/*如果count=0,说明没有事件发生,当前进程挂起*/
if (!count) {
count = wait->error;
if (signal_pending(current))
count = -EINTR;
}
/*count!=0说明有事件发生了,需要跳出循环;等待超时也跳出循环*/
if (count || timed_out)
break;
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_end) {
busy_end = busy_loop_end_time();
continue;
}
/*未超时继续死循环*/
if (!busy_loop_timeout(busy_end))
continue;
}
busy_flag = 0;
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
/*
1、设置当前进程状态
2、slepp让出cpu,直到时间到期
*/
if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
timed_out = 1;
}
return count;
}
其中,具体检查事件的函数是do_pollfd,核心是执行file结构体的poll回调函数:
/*
* Fish for pollable events on the pollfd->fd file descriptor. We're only
* interested in events matching the pollfd->events mask, and the result
* matching that mask is both recorded in pollfd->revents and returned. The
* pwait poll_table will be used by the fd-provided poll handler for waiting,
* if pwait->_qproc is non-NULL.
1、根据fd的值完善mask
2、执行回调函数
*/
static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait,
bool *can_busy_poll,
unsigned int busy_flag)
{
unsigned int mask;
int fd;
mask = 0;
fd = pollfd->fd;
if (fd >= 0) {
struct fd f = fdget(fd);
mask = POLLNVAL;
if (f.file) {
mask = DEFAULT_POLLMASK;
if (f.file->f_op->poll) {
pwait->_key = pollfd->events|POLLERR|POLLHUP;
pwait->_key |= busy_flag;
/*事件的回调函数*/
mask = f.file->f_op->poll(f.file, pwait);
if (mask & busy_flag)
*can_busy_poll = true;
}
/* Mask out unneeded events. */
mask &= pollfd->events | POLLERR | POLLHUP;
fdput(f);
}
}
pollfd->revents = mask;
return mask;
}
总结:
1、这里有个epoll、poll、select的效率对比:可以看到,随着连接数增加,epoll的耗时几乎不变;但是poll和select的耗时呈指数型增长!
2、正常情况下,服务器在同一时间可能会和几十万、甚至上百万客户端建立连接!当收到客户端数据时,需要epitem,此时用fd建立红黑树就很适合了! 但类似ipc这种场景,毕竟使用的数量、频率肯定比不上socket,所以没必要额外耗费空间建立红黑树,直接遍历链表即可,效率低不到哪去!
参考:
1、https://xxpcb.gitee.io/2019/09/15/%E8%BF%9B%E7%A8%8B%E9%97%B4%E9%80%9A%E4%BF%A1-IPC/ ipc对比
2、https://cyril3.github.io/2018/01/15/helicopter-view-of-interprocess-communication linux进程通信概览
3、https://blog.csdn.net/spiremoon/article/details/106004076 多进程管道通信以及select、poll函数的应用
4、https://blog.csdn.net/Eunice_fan1207/article/details/99641348 Linux内核剖析-----IO复用函数poll内核源码剖析
离线
页次: 1