84-使用非阻塞 I/O 改写回射客户端

这个程序,应该是相当复杂的。读完它需要一些耐心,不过我会力求突显程序的结构,删除无关的代码。

1. 回顾旧程序

旧版本程序的结构如下:

while(1) {   rfds = {stdin, sockfd};   select(rfds);   if (stdin in rfds) {     read(stdin);     // 风险代码,可能产生阻塞     writen(sockfd);   }    if (sockfd in rfds) {     read(sockfd);     writen(stdout);   } }

之前分析过,它的弱点在于 writen(sockfd) 会导致阻塞。为了解决这个问题,就不能使用 writen 函数,而应该改为 write,但是如果改成了 write,我们就不能保证 write 能一次将数据全部写入发送缓冲区。这导致的另一个麻烦就是我们不得不设置应用层缓冲区(不使用多进程多线程)。

那么新的写法看起来像是这样:

char to[4096]; int start = 0; int end = 0;  while(1) {   // ...   // 往发送缓冲区添加数据   nr = read(stdin, &to[end], 4096 - end);    end += nr;   // 将发送缓冲区的数据发送出去   nw = write(sockfd, &to[start], end - start);   start += nw;   // ...   // to 中没有发送完的,就等着下一次再发送吧。  }

下面我们正式一点说。

2. 程序设计

2.1 思路


这里写图片描述
图1 数据流动过程

观察图 1,客户端设置了两个缓冲区:

  • to:存放从标准输入读取到的数据
  • from:保存从服务器发来的数据

to 中的灰色部分,表示已经发给服务了,而绿色部分,表示尚未发送的。 from 的灰色部分,表示已经写到标准输出了,绿色部分,表示还尚未写入到标准输出的。空白部分表示空闲。

使用指针 tostart, toend, fromstart, fromend, 就可以得到这三个区域的任何一个位置。

2.2 程序伪代码

  • 精简版本
while(1) {   rfds = {stdin, sockfd};   wfds = {stdout, sockfd};    select(&rfds, &wfds);   // 1. 读标准输入到缓冲区 to   if (stdin in rfds) {     read(stdin, toend);     wfds.insert(sockfd);   }    // 2. 读套接字到缓冲区 from   if (sockfd in rfds) {     read(sockfd, fromend);     wfds.insert(stdout);   }    // 3. 写数据到标准输出   if (stdout in wfds) {     write(stdout, fromstart);   }    // 4. 写数据到套接字   if (sockfd in wfds) {     write(sockfd, tostart);   } }

当你在阅读详细版本的时候,请参考精简版本的 4 个步骤来搞清逻辑。

  • 详细版本
char *to = malloc(length); char *from = malloc(length); tostart = toend = to; fromstart = fromend = from;  fd_set rfds, wfds; int stdinclosed = 0; // 标准输入是否关闭 int servclosed = 0; // 服务器是否关闭  // 重要!设置描述符为非阻塞 IO setNonblock(sockfd); setNonblock(stdin); setNonblock(stdout);  while(1) {   // 这一部分,表示将能够监听的 I/O 加入到对应的监听集合中去。   // 如果缓冲区没有空闲,就没有监听的意义了。   rfds.clear();   // 查看收发缓冲区有没有空闲,有空闲就监听。   if (stdinclosed == 0 && toend < to + length) rfds.insert(stdin);   if (servclosed == 0 && fromend < from + length) rfds.insert(sockfd);    // 查看收发缓冲区有没有数据,有数据就监听   if (tostart < toend) wfds.insert(sockfd);   if (fromstart < fromend) wfds.insert(stdout);    // 有事件再往下执行   select(&rfds, &wfds);    // 1. 标准输入有数据则读入发送缓冲区   if (stdin in rfds) {      n = to + length - toend; // 白色部分空闲区大小      nr = read(stdin, toend, n);      if (nr < 0) {        // 尽管 select 通知有数据可读,但是我们还是得预防 EWOUDLBLOCK 发生的可能性。        // 如果使用阻塞 IO,结果就是程序在 read 上阻塞,这是不应该发生的情况。        if (errno != EWOULDBLOCK) exit(1);      }      else if (nr == 0) {         stdinclosed = 1; // 标准输入关闭,标志位置位         if (tostart == toend) {           // 发送缓冲区没有数据要发送了(没有绿色部分),半关闭。           shutdown(sockfd, SHUT_WR);         }      }      else {        toend += nr; // 扩大绿色部分大小        wfds.insert(sockfd); // 提前通知有写事件,你完全可以不写这一行      }   }    // 2. 套接字有数据可读则读入 from 缓冲区   if (sockfd in rfds) {     n = from + length - fromend;     nr = read(sockfd, fromend, n);  // 将数据读取到白色区域     if (nr < 0) {        if (errno != EWOULDBLOCK) exit(1);     }     else if (nr == 0) {       servclosed = 1; // 服务器关闭标志位置位,此时不能直接退出,因为接收缓冲区可能还有绿色部分。       if (fromstart == fromend) {         // 接收缓冲区空闲,可以安全退出。如果标准输出速度非常慢,这个 if 很可能执行不到。         LOG("1:finished\n");         break;       }     }     else {       fromend += nr; // 绿色部分变长       wfds.insert(stdout);     }   }    // 3. 处理接收缓冲区(from 有绿色部分,将其写入标准输出)   if (stdout in wfds && fromend - fromstart > 0) {     n = fromend - fromstart;     nw = write(stdout, fromstart, n);     if (nw < 0) {       if (errno != EWOULDBLOCK) exit(1); // 不是 EWOULDBLOCK 则出错     }     else {       fromstart += nw; // 灰色部分变长,绿色减少       if (fromstart == fromend) {         fromstart = fromend = from; // 重置         // 全部处理完成         if (servclosed) { // 如果服务器已经关闭,说明数据全部处理完毕           LOG("2:finished\n");           break;         }       }     }   }     // 4. 发送缓冲区有数据可发送(to 中有绿色部分,发送到服务器)   if (sockfd in wfds && toend - tostart > 0) {     n = toend - tostart;     nw = write(sockfd, tostart, n);     if (nw < 0) {       if (errno != EWOULDBLOCK) exit(1); // 不是 EWOULDBLOCK 则出错     }     else {       tostart += nw; // 灰色变长,绿色变短       if (tostart == toend) { // 全部处理完毕         tostart = toend = to; // 重置         if (stdinclosed) {           // 只有标准输入已经关闭的情况下才能关闭           shutdonw(sockfd, SHUT_WR);         }       }     }   } }

2.3 项目代码

这一段程序确实很长,需要考虑的东西太多。更加详细的代码请参考 gitos 托管的代码。

git clone https://git.oschina.net/ivan_allen/unp.git

如果你已经 clone 过这个代码了,请使用 git pull 更新一下。本节程序所使用的程序路径是 unp/program/nonblockio/nbio.

3. 实验

本次实验仍然分成两个部分,即缓冲区大小分别设置为 4096 和 1024000.

3.1 4096 字节

$ ./run_client.sh 4096 -v


这里写图片描述
图1 缓冲区大小为 4096 字节

3.2 1024000 字节

$ ./run_client.sh 1024000 -v


这里写图片描述
图2 缓冲区大小为 1024000 字节

3.3 结果分析

很幸运的是,大缓冲区下,客户端也没有阻塞。从图 1 和图 2 中的结果看,客户端与服务器的数据传输和处理的瓶颈在于步骤 2 和步骤 3.

注意到步骤 1 和步骤 4(将数据全部发到服务器)早已完成,而步骤 2 和步骤 3 的速度却很滞后。原因在于服务器端的 read 函数缓冲区太小,只有 4096 字节。可以通过适当的增大服务器端缓冲大小,图 3 显示的结果是将服务器 read 函数缓冲大小更改为 65536 后的结果。


这里写图片描述
图3 服务器缓冲大小更改为 65536

4. 标准输出比网络 IO 慢

注意观察图 1、2、3 中,客户端最后一行是 "1:finished!" 结束,如果标准输出的速度比网络 IO 还要慢,则下面这个步骤 2 中的这个 if 是不成立的:

if (服务器关闭) {   // ...   servclosed = 1;   if (fromstart == fromend) {     // 接收缓冲区空闲,可以安全退出。如果标准输出速度非常慢,这个 if 很可能执行不到。     LOG("1:finished\n");     break;   } }

此时应该执行到步骤 3 中的 if:

 if (fromstart == fromend) {   fromstart = fromend = from; // 重置   // 全部处理完成   if (servclosed) { // 如果服务器已经关闭,说明数据全部处理完毕     LOG("2:finished\n");     break;   } } 

通过在客户端开启 --slow 选项,可以让标准输出速度慢下来。


这里写图片描述
图4 标准输出 IO 慢于网络 IO,最后一行输出 "2:finished!"

5. 总结

  • 掌握非阻塞 I/O + 缓冲区的客户端实现方法

思考:有些同学并没有将 stdin, stdout, sockfd 设置成非阻塞 IO,程序也能正常工作,这样做可以吗?如果有问题,问题在哪里?

最后,要提的是非阻塞 I/O 处理起来确实很麻烦,有时候代码的复杂程度可能会让你得不偿失。使用多线程 + 阻塞 I/O 其实是更为推荐的方法,而且程序的效率也不会比非阻塞 I/O 差多少(unp 一书中对不同的设计进行了测试对比)。

说明:本文转自blog.csdn.net,用于学习交流分享,仅代表原文作者观点。如有疑问,请联系我们删除~