跳转至

五种IO模型与selectpoll分别实现多路转接

约 8593 个字 603 行代码 5 张图片 预计阅读时间 36 分钟

何为IO

不论是在前面文件部分,还是后面的网络部分,IO都是非常常见的。但是当时只是简单对IO进行提及,并没有对IO的本质进行介绍。那么到底何为IO?IO全称为输入和输出,而任何一个IO过程都需要涉及到两个过程:

  1. 等待
  2. 拷贝

所以,此处就可以得出一个关于IO的简单结论:IO实际上是由等待+拷贝两个行为构成的

通过前面对readwrite等接口的学习可以知道一个非常直观的现象:如果写方并没有写入内容,那么读方会进入阻塞;反过来,如果当前写方的缓冲区已满,那么写方也会阻塞。这里的两个阻塞本质上就是等待。对于这种存在一种等待条件的称为阻塞式IO,而从等待到不等待或者从不等待到等待本质都属于一种条件的变化,在IO部分,称这一现象为IO事件,而一旦条件发生改变(例如从等待变为不等待),那么就称为IO事件就绪

何为高效IO

从前面学习到的IO可以看到,IO实际上大部分的时间都是在等待而并非拷贝。例如在Linux进程间通信一节可以看到只有写方写入,读方才可以开始读,而读只是一瞬间的事情,又比如在UDP编程接口基本使用一节可以看到只有客户端写入,服务端才会读取到信息,而其中的读取也是一瞬间的事情,这两个例子最明显的共同点就是读取都必须等待,而拷贝消耗的时间相比于等待消耗的时间非常少。所以,所谓的高效IO实际上解决的主要矛盾就是减少IO中等待的时间以提高整机的资源利用率

既然如此,那么有没有可以让等待的时间尽可能减少从而减少IO整体的时间消耗的方式呢?答案是有,具体是什么方式在接下来会介绍

五种IO模型与对比

在介绍高效的IO方式之前,先了解常见的五种IO模型:

  1. 阻塞式IO:在内核将数据准备好之前,系统调用会一直等待。所有的套接字,默认都是阻塞方式。这是比较常见的IO方式。以read接口为例,示意图如下:

  2. 非阻塞式IO:如果内核已经将数据准备好,直接返回数据;如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码或者EAGAIN错误码。但是非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询,这个过程对于CPU来说是较大的浪费,一般只有特定场景下才使用。示意图如下:

  3. 信号驱动式IO:内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作

  4. 多路转接(也称为多路复用):进程同时管理多个文件描述符,只要有一个文件描述符有数据就立即通知给进程进行数据读取。以select为例,示意图如下:

  5. 异步IO:当前一方向另外一放发起IO处理,当前一方不再参与IO行为,当数据准备完毕时,由通知到的一方处理完成(等待+拷贝),再交给发起IO的一方,示意图如下:

下面依次探讨上面的五种IO模型:

首先考虑阻塞式IO和非阻塞式IO,阻塞式IO在进程读取到数据之前会一直阻塞而不会去做其他的事情,非阻塞式IO不会阻塞在IO的过程之中,这个阶段会在做其他事情与检测是否有数据中轮询执行,二者最本质的区别就是一个在卡主等待,另外一个并不会卡主。从这个角度来看,非阻塞式IO的效率的确会比较高,因为进程可以去完成其他的事情,但是实际上,阻塞式IO和非阻塞式IO从当前IO单一过程来看,并没有实际上的效率区别,本质都是需要等待。换句话说,二者的等待时间占比是基本一致的。所以,从IO层面来看,非阻塞式IO并没有比阻塞式IO有很大的IO效率提升,所以理解所谓的「非阻塞式IO效率高」这句话并不是单纯看IO效率,而是看整机效率

接着看第三种方式:信号驱动式IO。这种方式可以理解为是非阻塞式IO的一种升级,非阻塞式IO需要一直轮询检测数据是否就绪,也就是说是否有数据需要自己判断,而信号驱动式IO数据是否就绪由操作系统判断,一旦操作系统发现数据就绪就通过信号通知进程,所以本质信号驱动式IO是逆转了判断数据就绪的方式,从而减少IO过程中等待的占比时间,因为等待的一方是操作系统而不是当前进程,所有从当前进程来看IO过程中等待占比时间减少

接着看第四种方式:多路转接。这种方式最大的特点就是一个进程可以持有多个文件描述符,因为在Linux下,一切皆文件,所以拥有多个文件描述符也就意味着可以读取到多个文件描述符中的内容,如果此时当前进程告知发送方只要有数据就向这些文件描述符中的任意一个写入数据,那么整体来看当前进程的等待时间就会减少,因为只要有一个文件描述符有数据,当前进程就会进入数据拷贝,所以多路转接方式中进程更多的时间在做拷贝而不是等待从而效率变高

最后看第五种方式:异步IO。了解异步IO之前,首先区分同步IO异步IO,区分二者的方式很简单:判断是否是发起IO行为的进程本身参与IO的任意一个过程(等待或者拷贝),只要参与了任意一种行为,就属于同步IO,否则属于异步IO。很明显,异步IO的核心特点是IO操作和程序逻辑可以并行执行。异步IO特别适合密集型任务和高并发场景

既然有五种IO模型,那么效率最高的是哪一种呢?第四种:多路转接。因为异步IO虽然效率高,但是实现起来非常复杂,这就导致实现出的异步IO代码难度高且可维护性并没有多路转接好,另外现在的服务器大部分都是基于Linux,而Linux下的多路转接技术相对容易实现,所以相对多的情况下还是使用多路转接。而其余三种IO方式,与多路转接最大的不同就是只有一个文件描述符,能否及时数据的概率始终是取决于对方,而多路转接因为存在多个文件描述符,增加了收到数据的概率,所以此时能否及时处理数据就取决于自己,但是取决于自己就可以保证最大程度上利用CPU资源,所以还是更推荐多路转接

非阻塞式IO

为了后面更好得理解多路转接,先了解一下阻塞式IO的特点,以下面的代码为例:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <string>
#include <unistd.h>

using namespace std;

int main()
{
    std::string message = "请输入信息:";
    while (true)
    {
        // 先向标准输出中写入数据
        write(1, message.c_str(), message.size());
        char buffer[1024] = {0};
        // 再从标准输入中读取数据
        ssize_t n = read(0, buffer, sizeof(buffer) - 1);

        if(n > 0)
            std::cout << buffer << std::endl;
    }

    return 0;
}

上面的代码是阻塞式IO,实际的效果就是如果用户没有输入,那么控制台就会一直卡在下面的位置:

Text Only
1
请输入信息:

一旦用户输入一条信息,控制台就会打印出对应的信息,之后继续卡在同样的位置

如果需要将上面的阻塞式IO修改为非阻塞式IO可以使用fcntl接口,这个接口可以针对特定文件描述符进行操作,就不用再针对某一个接口设置非阻塞,其原型如下:

C
1
int fcntl(int fd, int op, ... /* arg */ );

该函数有三个参数,第一个参数表示要指定的文件描述符,第二个参数为操作标记,有下面几种值:

  • 复制一个现有的描述符(F_DUPFD
  • 获得/设置文件描述符标记(F_GETFDF_SETFD
  • 获得/设置文件状态标记(F_GETFLF_SETFL
  • 获得/设置异步IO所有权(F_GETOWNF_SETOWN
  • 获得/设置记录锁(F_GETLKF_SETLKF_SETLKW

第三个参数表示根据对应操作标记需要设置的值

本次因为需要更改是否为阻塞,就需要使用到F_GETFLF_SETFL,如果是F_GETFL,那么fcntl函数对应的文件描述符的状态,表示获取,并且不需要设置第三个参数;如果是F_SETFL,则没有返回值,表示设置,如果要设置为非阻塞IO,那么对应的第三个参数设置为O_NONBLOCK

基于上面的理论,可以设计出下面的函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void setNonBlock(int fd)
{
    // 获取当前文件描述符状态
    int stat = fcntl(fd, F_GETFL);
    if(stat < 0)
    {
        std::cerr << "状态错误" << std::endl;
        return;
    }
    // 设置为非阻塞
    fcntl(fd, F_SETFL, stat | O_NONBLOCK);
}

接着,设置标准输入为非阻塞:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
int main()
{
    // ...
    setNonBlock(0);
    while (true)
    {

    }
    // ...
}

此时,如果直接运行就会看到控制台一直在打印请输入信息:。为了减慢打印速度,可以在循环内部考虑加一个sleep(1);

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
int main()
{
    std::string message = "请输入信息:";
    setNonBlock(0);
    while (true)
    {
        // ...

        sleep(1);
    }

    return 0;
}

这就已经完成了阻塞IO到非阻塞IO的转变。但是了解这个内容还不够,下面需要再进一步了解读写接口,以read为例,read接口的返回值如果为正数,那么此时就代表正常读取到的字符个数,如果返回值为0,那么此时就代表写端关闭,但是如果返回值为负数,那么存在两种情况:

  1. 内核数据未准备好
  2. 读取失败

基于上面的三种返回值,可以修改上面的代码:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int main()
{
    std::string message = "请输入信息:";
    setNonBlock(0);
    while (true)
    {
        // 先向标准输出中写入数据
        write(1, message.c_str(), message.size());
        char buffer[1024] = {0};
        // 再从标准输入中读取数据
        ssize_t n = read(0, buffer, sizeof(buffer) - 1);

        // 针对三种返回值考虑三种情况
        if(n > 0)
            std::cout << buffer << std::endl;
        else if(n == 0)
            std::cout << "写端关闭" << std::endl;
        else
            std::cout << "错误" << std::endl;

        sleep(1);
    }

    return 0;
}

因为此时是非阻塞IO,运行且不输入任何数据就会看到下面的结果:

Text Only
1
2
3
4
5
6
7
请输入信息:错误
请输入信息:错误
请输入信息:错误
请输入信息:错误
请输入信息:错误
请输入信息:错误
...

这一结果也印证了如果底层数据还没有准备好,那么read接口会直接返回为-1

但是,上面提到了read接口返回-1有两种情况,现在就面临着一个问题:既然-1不但代表失败,还代表底层数据没有准备好,而对于数据还没有准备好一般处理逻辑和失败的处理逻辑是不一样的,那么这种情况下如何做出写出不同的处理逻辑?答案是通过错误码,在Linux进程控制部分提到过库函数如果出错会设置对应的错误码,实际上,除了库函数以外,系统调用出错也会设置对应的错误码,而这个错误码一般记录着最近一次出错的系统调用或者库函数调用的出错信息编号

对于read函数来说,如果底层数据还没有准备好,那么错误码会被设置为11,表示EAGAIN或者EWOULDBLOCK,而不是这两个错误码就说明当前-1代表读取失败,所以进一步修改上面的代码:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int main()
{
    // ...
    while (true)
    {
        // ...

        if(n > 0)
            std::cout << buffer << std::endl;
        else if(n == 0)
        {
            std::cout << "写端关闭" << std::endl;
            break;
        }
        else
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                std::cout << "底层数据还没有准备好" << std::endl;
                sleep(1);
                continue;
            }
            else
            {
                std::cout << "错误" << std::endl;
                break;
            }
        }

        sleep(1);
    }

    return 0;
}

此时再次编译运行上面的代码就可以看到打印的消息变为:

Text Only
1
2
3
4
5
6
7
请输入信息:底层数据还没有准备好
请输入信息:底层数据还没有准备好
请输入信息:底层数据还没有准备好
请输入信息:底层数据还没有准备好
请输入信息:底层数据还没有准备好
请输入信息:底层数据还没有准备好
...

对于阻塞IO来说,如果当前陷入内核后被调度,那么调度完成后再次回到上一次的位置依旧会保持阻塞状态,因为对于大部分的IO类系统调用本身就包含了IO事件的判断以及对进程进行挂起的逻辑,但是如果是非阻塞IO,其阻塞状态几乎不存在,但是很有可能处于数据拷贝的状态,如果此时被切换,错误码就会被设置为EINTR,一旦收到错误码为EINTR,那么就说明此时正常行为被打断,例如拷贝过程,需要上层自行处理这种情况

所以,再次修改上面的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int main()
{
    // ...
    while (true)
    {
        // ...

        if(n > 0)
            std::cout << buffer << std::endl;
        else if(n == 0)
        {
            std::cout << "写端关闭" << std::endl;
            break;
        }
        else
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                std::cout << "底层数据还没有准备好" << std::endl;
                sleep(1);
                continue;
            }
            else if(errno == EINTR)
            {
                std::cout << "进程中断,请重试" << std::endl;
                sleep(1);
                continue;
            }
            else
            {
                std::cout << "错误" << std::endl;
                break;
            }
        }

        sleep(1);
    }

    return 0;
}

基于对非阻塞IO的介绍,思考:对于read等IO接口来说,什么错误是可接受(可以在运行程序时直接处理)的?即错误码被设置为EAGAINEWOULDBLOCK或者EINTR(腾讯C++后端一面)

如果在当前情况下想看第二个分支结果,可以按下键盘的Ctrl+D,这个快捷键表示终止键盘输入,此时就会看到下面的结果:

Text Only
1
写端关闭

select实现多路转接

在上面已经基本介绍了多路转接的基本原理,基于这个原理,可以推出多路转接的作用:

  1. 等待:等待多个文件描述符中的任意一个就绪
  2. 通知:一旦某个文件描述符就绪,就通知进程可以开始进行数据拷贝

所以多路转接本质就是IO事件就绪的通知机制

下面结合代码来理解多路转接,在Linux下,实现多路转接可以使用select接口,其原型如下:

C
1
2
int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

该接口一共有五个参数,首先介绍第一个参数和最后一个参数:

对于第一个参数来说,其表示文件描述符的个数,根据官方手册的描述,这个参数的值为三个文件描述符集中最大文件描述符+1,所以想要确定这个值,就必须要遍历三个文件描述符集

对于最后一个参数来说,这是一个输入、输出型参数,其表示等待时间,有三种情况:

  1. 阻塞等待:将其设置为nullptr,如果有一个文件描述符就绪,select就会立即返回
  2. 非阻塞等待:将timeval结构体中所有字段初始化为0,不论是否有数据就绪,select都会立即返回
  3. 等待指定时间:分别设置结构体timeval中的两个字段值,操作系统会以当前系统时间为起点设置超时时间。此时如果在等待时间内没有数据就绪或者到达指定的之间,那么select就会立即返回并将该结构体中的两个字段值设置为0;如果在等待时间内有数据就绪或者到达指定的时间,那么select会将该结构体中的两个字段值设置为实际剩余的时间

timeval结构体的定义如下:

C
1
2
3
4
5
struct timeval 
{
    time_t      tv_sec;     /* seconds */
    suseconds_t tv_usec;    /* microseconds */
};

接下来看剩余的三个参数,但是在具体介绍三个参数的作用之前,先了解一下fd_set这个数据类型。fd_set表示文件描述符集,其本质是一个位图,但是位图的元素值只能是0或者1,所以fd_set要想表示具体的文件描述符就需要借助下标,所以这个位图每一个比特位和对应的位置含义如下:

  1. 当前比特位位置表示文件描述符
  2. 当前比特位的值表示需要关心当前文件描述符的IO事件

但是,在Linux中fd_set底层本质上就是一个数组,其每一个元素类型为unsigned long,其原型如下:

C
1
2
3
4
5
#define __FDSET_LONGS   (__FD_SETSIZE/__NFDBITS)

typedef struct {
    unsigned long fds_bits [__FDSET_LONGS];
} __kernel_fd_set;

这就意味着在Linux中,每个文件描述符集能管理的文件描述符都是有限的,这就是select接口的局限性,所以一般使用select接口都是在比较小型的项目中使用。如果要判断当前Linux系统中的fd_set位图大小,可以使用下面的代码:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#include <iostream>

#include <sys/select.h>

int main()
{
    fd_set fds;
    std::cout << sizeof(fds) * 8 << std::endl;

    return 0;
}

例如,下面的结果:

Text Only
1
1024

在上面的代码中,之所以要乘以8,是因为fd_set本质是对一个数组进行的封装,这个数组中每一个元素是unsigned long类型,所以为了计算出每一个比特位,需要将元素个数乘以8

接下来再看select的三个参数:

  1. readfds:表示关心指定文件描述符的读事件,对应的就是关心对应的读缓冲区是否有数据
  2. writefds:表示关心指定文件描述符的写事件,对应的就是关心对应的写缓冲区是否有空间
  3. exceptfds:表示关心指定文件描述符的异常事件,对应的就是关心对应的文件描述符是否发生异常

既然这三个参数都是位图,那么必然需要借助位图的相关操作来对这三个参数进行操作,在Linux中提供了下面的接口来对这三个参数进行操作:

C
1
2
3
4
void FD_CLR(int fd, fd_set *set); // 移除指定位图中的指定文件描述符
int  FD_ISSET(int fd, fd_set *set); // 判断指定位图中是否存在指定文件描述符
void FD_SET(int fd, fd_set *set); // 添加指定位图中的指定文件描述符
void FD_ZERO(fd_set *set); // 清空指定位图

select最后一个参数相同,select的前三个参数也是输入、输出型参数,所以在调用select接口时,需要先将这三个参数设置为需要关心的文件描述符,在select接口中会判断关心的文件描述符是否就绪,如果就绪对应的描述符(下标)对应的比特位就会被置为1,而其他没有就绪的文件描述符对应的比特位就会被置为0,所以在调用select接口之后,判断哪些文件描述符就绪,进而进行数据拷贝。例如,以readfds为例,在调用select接口之前,需要先设置需要关心读的文件描述符,调用完select接口之后,如果关心的文件描述符对应的比特位被置为1,就说明当前文件描述符有数据可读

了解完所有参数后,接下来考虑select接口的返回值,select接口的返回值也有三种情况:

  1. 存在文件描述符就绪:返回大于0的值,表示就绪的文件描述符个数
  2. 等待失败:返回-1
  3. 底层没有文件描述符的数据就绪:返回0

下面创建基于select的TCP服务器为例,演示使用select接口实现多路转接:

Note

本次利用到了在介绍HTTP协议基本结构与基本实现HTTPServer时封装的TcpSocket

首先创建一个selectServer类,这个类表示基于select的TCP服务器,所以少不了需要用到TcpSocket类对象,所以首先考虑selectServer中创建一个基类指针成员,并在构造函数的初始化列表中进行初始化,并在构造函数体中调用父类的BaseSocket中的initSocket接口执行创建、绑定和监听:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class SelectServer
{
public:
    SelectServer(uint16_t port = default_port)
        :bs_(std::make_shared<TcpSocket>())
    {
        bs_->initSocket();
    }
private:
    std::shared_ptr<BaseSocket> bs_;
};

接着,创建一个函数用于启动服务器。同样,为了更好得表示启动状态一个服务器对象防止重复启动,考虑加上一个成员变量isRunning_用于判断当前服务器是否正在运行:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class SelectServer
{
public:
    // ...

    void startServer()
    {
        isRunning_ = true;
        while (isRunning_)
        {

        }
        isRunning_ = false;
    }
private:
    // ...
    bool isRunning_;
};

按照之前的逻辑,下一步就需要开始接收客户端连接,即调用TcpSocket中的toAccept接口,通过toAccept接口的返回值判断是否可以开始读取客户端发送的内容。但是,toAccept接口中的accept默认是阻塞式等待,这就导致当前进程只能处理一个客户端的请求,所以在之前使用的就是多线程(或者子进程)的方式来解决这个问题

但是,不需要创建多线程或者子进程,只需要使用select接口即可完成。具体思路如下:

因为在TCP中,服务端和客户端想要建立连接必须进行三次握手,也就是说,客户端肯定会给服务端发送建立连接的请求,此时的服务端就需要读取对应的请求信息,所以对于监听套接字描述符_listen_socketfd来说,需要关注的就是这个套接字上的读事件,所以在select接口中需要将_listen_socketfd添加到readfds中,并且在select接口返回时,判断_listen_socketfd对应的比特位是否为1,如果为1,就说明有客户端连接请求

根据上面的思路,首先需要再TcpSocket中提供一个获取_listen_socketfd的方法,同样,为了保证模版的灵活性,这个方法需要先在父类BaseSocket中进行声明,再在子类TcpSocket中进行定义:

C++
1
2
3
4
5
6
7
class BaseSocket
{
public:
    //...

    virtual int getListenSocketFd() = 0;
};
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class TcpSocket : public BaseSocket
{
public:
    //...

    // 获取监听套接字
    int getListenSocketFd() override
    {
        return _listen_socketfd;
    }   
}

接着,在selectServer中实现startServer函数,先以阻塞版本的select为例:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
void startServer()
{
    isRunning_ = true;
    fd_set rfds;
    // 清空读文件描述符集
    FD_ZERO(&rfds);
    // 设置监听套接字到读文件描述符集中
    FD_SET(bs_->getListenSocketFd(), &rfds);
    while (isRunning_)
    {
        // 交给select等待
        // 当前只有一个监听套接字,所以nfds即为监听套接字+1
        // 只关心读事件的文件描述符集
        int n = select(bs_->getListenSocketFd() + 1, &rfds, nullptr, nullptr, nullptr);
        if (n > 0)
        {
            // 监听套接字文件描述符准备完毕
        }
    }
    isRunning_ = false;
}

一旦进入n > 0的逻辑,此时就可以启动toAccept接口进行客户端连接了,封装为一个函数处理这段逻辑:

C++
1
2
3
4
5
6
7
8
9
void handler(fd_set& rfds)
{
    if(FD_ISSET(bs_->getListenSocketFd(), &rfds))
    {
        // 存在即可开始获取
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client));
    }
}

为了验证是否可以获取到客户端,可以在handler函数中打印客户端的IP和端口:

C++
1
2
3
4
5
6
7
8
9
void handler(fd_set& rfds)
{
    if(FD_ISSET(bs_->getListenSocketFd(), &rfds))
    {
        // ...

        LOG(LogLevel::DEBUG) << "当前接收到了客户端:" << client.getIp() << ":" << client.getPort() << "文件描述符为:" << ac_socketfd;
    }
}

编译运行上面的代码,就可以看到不论是多个客户端还是一个客户端,都可以正常获取到每一个客户端建立连接时的IP和端口

但是,如果此时将select改为非阻塞版本,就会发现没有客户端可以正常连接上服务器:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void startServer()
{
    // ...
    // 非阻塞
    struct timeval t = {0, 0};
    while (isRunning_)
    {
        // 交给select等待
        // 当前只有一个监听套接字,所以nfds即为监听套接字+1
        // 只关心读事件的文件描述符集
        int n = select(bs_->getListenSocketFd() + 1, &rfds, nullptr, nullptr, &t);
        // ...
    }
    isRunning_ = false;
}

根据前面的对select返回值的分析,如果返回值为0,就说明没有一个文件描述符有数据就绪,基于这个原理,修改代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void startServer()
{
    //...
    while (isRunning_)
    {
        // ...
        if (n > 0)
        {
            // 监听套接字文件描述符准备完毕
            handler(rfds);
        }
        else if(n == 0)
        {
            LOG(LogLevel::INFO) << "没有客户端连接";
        }
    }
    isRunning_ = false;
}

再次运行上面的代码,会发现控制台持续在打印没有客户端连接,出现这个问题的本质原因就是select是非阻塞的。但是为什么会出现有客户端发起连接,但是select却没有进入数据就绪的逻辑?下面具体分析上面代码存在的问题:

startServer中,首先设置了对应的_listen_socketfd,这符合前面提到的逻辑。但是在while循环中,因为select的第二个参数是输入、输出型参数,所以在执行select时首先是将设置好_listen_socketfd交给select让其知道需要关心这个文件描述符。因为是非阻塞版本,所以一旦进入select,该函数会发现一开始并没有任何客户端连接,从而会立即返回并且修改提供给select的关心读的文件描述符集rfds,这也就意味着,一旦启动服务器,select就会将已有的rfds修改为全0并且返回0,这就导致两种情况:

  1. 返回值为0,一直进入n == 0的逻辑
  2. rfds被修改为全0,之后的循环中交给selectrfds一直都是全0,导致始终返回0

最后,尽管有客户端连接,但是因为select不再监听任何文件描述符,所以永远不会执行handler函数,也就无法获取到客户端的连接信息

解决这个问题的方法就是将清空关心读的文件描述符逻辑和设置_listen_socketfd到关心读的文件描述符集逻辑全都放到while循环中,这样就可以保证每次select都能监听到_listen_socketfd对应的文件描述符,从而保证可以获取到客户端的连接信息。修改代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void startServer()
{
    isRunning_ = true;
    fd_set rfds;
    // 非阻塞
    struct timeval t = {0, 0};
    while (isRunning_)
    {
        // 清空读文件描述符集
        FD_ZERO(&rfds);
        // 设置监听套接字到读文件描述符集中
        FD_SET(bs_->getListenSocketFd(), &rfds);
        // ...
    }
    isRunning_ = false;
}

但是现在还有一个问题,在当前代码中,除了select的第二个参数是输入、输出型参数外,最后一个参数也是输入、输出型参数,那么是否也需要将设置timeval的逻辑放到while循环中呢?理论上是需要的,但是此处可以不需要,因为根据select处理timeval的逻辑,进入select后,如果在设定的超时时间内关心的文件描述符集中没有文件描述符就绪,就会返回将timeval中每个字段设置为0的结构体对象,此时下一次设置时t对象中的值依旧是{0, 0}。但是,尽管如此,还是更建议将设置超时时间的逻辑放到while循环中。修改代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void startServer()
{
    isRunning_ = true;
    fd_set rfds;
    while (isRunning_)
    {
        // 清空读文件描述符集
        FD_ZERO(&rfds);
        // 设置监听套接字到读文件描述符集中
        FD_SET(bs_->getListenSocketFd(), &rfds);
        // 非阻塞
        struct timeval t = {0, 0};
        // ...
    }
    isRunning_ = false;
}

为了更好得显示出打印信息,考虑在n == 0的逻辑添加一个sleep(1)减少打印的频率,再次运行代码,就可以看到客户端连接成功,并且服务器会打印出客户端的IP和端口信息

既然完成了客户端和服务端的连接,接下来是否可以考虑在toAccept的逻辑之后直接接收信息呢?不可以,因为recvData函数中的recv接口默认也是阻塞式的,如果直接接收信息,依旧会出现服务端卡在接收的位置。解决这个问题的方法同样需要借助select,下面思考具体的思路:

因为客户端读取本质还是访问对应的文件描述符,所以让select除了关心_listen_socketfd以外,还需要关心获取到的ac_socketfd。但是现在出现了第二个问题,如何将ac_socketfd添加到select中呢?借助一个辅助数组,这个辅助数组的作用就是存储有效的且同类的所有文件描述符,例如以关心读为例,当前辅助数组中就需要存储所有关心读的文件描述符。基于这个思路,首先需要在SelectServer类中添加一个成员变量fds_,这个成员变量就是辅助数组。此处考虑使用定长数组,这个数组的长度就设置为fd_set可以表示的所有文件描述符个数,并且在构造函数中将该数组全部初始化为-1:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 定长数组大小
constexpr int g_fd_array_num = sizeof(fd_set) * 8;

class SelectServer
{
public:
    SelectServer(uint16_t port = default_port)
        : /* ... */
        , fd_array_(std::make_shared<std::array<int, g_fd_array_num>>())
    {
        // ...
        // 数组元素全部初始化为-1
        fd_array_->fill(-1);
    }

    // ...

private:
    // ...
    std::shared_ptr<std::array<int, g_fd_array_num>> fd_array_;
    // ...
};

接着,因为要确保_listen_socketfd可以成功添加到select中,所以在构造中单独设定第一个元素为_listen_socketfd

C++
1
2
3
4
5
6
7
8
9
SelectServer(uint16_t port = default_port)
    : /* ... */
    , fd_array_(std::make_shared<std::array<int, g_fd_array_num>>())
{
    // ...

    // 固定第一个元素为监听套接字
    (*fd_array_)[0] = bs_->getListenSocketFd();
}

接着,为了将fd_array_中的文件描述符添加到select中,在每一次select之前,都需要遍历fd_array_,将其中不为-1的文件描述符添加到select中,并且为了设置select的第一个参数,需要获取到当前数组的最大值,修改startServer函数如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void startServer()
{
    // ...
    while (isRunning_)
    {
        // 清空读文件描述符集
        FD_ZERO(&rfds);
        // 设置监听套接字到读文件描述符集中
        for (auto &fd : *fd_array_)
            if(fd != -1)
                FD_SET(fd, &rfds);

        int maxVal = 0;
        auto pos = std::max_element(fd_array_->begin(), fd_array_->end());
        if(pos)
            maxVal = *pos;

        // ...

        int n = select(maxVal + 1, &rfds, nullptr, nullptr, &t);
        // ...
    }
    isRunning_ = false;
}

接着,为了保证select可以正常接收到后续的ac_socketfd,需要在handler函数中判断监听套接字存在于rfds的分支中将获取到的ac_socketfd添加到辅助数组中,对应的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
void handler(fd_set& rfds)
{
    if(FD_ISSET(bs_->getListenSocketFd(), &rfds))
    {
        // 存在即可开始获取
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client));

        // ...

        // 将获取到的新ac_socketfd添加到辅助数组
        for (auto &f : *fd_array_)
        {
            if (f == -1)
            {
                f = ac_socketfd;
                break;
            }
        }
    }
}

添加完之后,下一次startServerwhile循环逻辑中就会添加新的ac_socketfd到关心读的文件描述符表中从而被select关心

完成上面的逻辑后,服务器就可以考虑处理读取客户端发送的信息。此时在handler函数中因为当前rfds中不只有监听套接字_listen_socketfd,所以需要接着判断:如果当前辅助数组中的元素为不为-1,说明当前获取到一个有效的文件描述符,此时需要判断是否是监听套接字,如果是就执行连接客户端的逻辑,否则就是读写的逻辑。但是只获取到辅助数组中的文件描述符还不够,因为还需要判断当前文件描述符是的确准备好数据的,即判断指定套接字在rfds中标记位是否为1。修改代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void handler(fd_set &rfds)
{
    for (auto &fd : *fd_array_)
    {
        if (fd != -1)
        {
            if(fd == bs_->getListenSocketFd())
            {
                if (FD_ISSET(fd, &rfds))
                {
                    // 存在即可开始获取
                    SockAddrIn client;
                    int ac_socketfd = bs_->toAccept(std::addressof(client));

                    LOG(LogLevel::DEBUG) << "当前接收到了客户端:" << client.getIp() << ":" << client.getPort() << "文件描述符为:" << ac_socketfd;

                    // 将获取到的新ac_socketfd添加到辅助数组
                    for (auto &f : *fd_array_)
                    {
                        if (f == -1)
                        {
                            f = ac_socketfd;
                            break;
                        }
                    }
                }
            }
            else 
            {
                if(FD_ISSET(fd, &rfds))
                {
                    // 说明是用于读取的套接字
                    std::string buffer;
                    ssize_t n = bs_->recvData(buffer, fd);

                    if(n > 0)
                        LOG(LogLevel::INFO) << "客户端发送了:" << buffer;
                    else if(n == 0)
                    {
                        LOG(LogLevel::INFO) << "客户端退出";
                        fd = -1;
                    }
                    else
                    {
                        LOG(LogLevel::INFO) << "错误";
                        fd = -1;
                    }
                }
            }
        }
    }
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 接收数据
ssize_t recvData(std::string &out_data, int ac_socketfd) override
{
    char buffer[4096] = {0};
    ssize_t ret = recv(ac_socketfd, buffer, sizeof(buffer), 0);
    if (ret > 0)
        out_data = buffer;
    else
        close(ac_socketfd);

    return ret;
}

此时的recvData接口一定不会阻塞,因为只有内核准备好了数据,select才会返回大于0的值,此时要么是_listen_socketfd,要么就是ac_socketfd,同样toAccept接口也不会被阻塞

编译运行上面的代码即可发现服务端可以正常收到多个客户端发送的消息

至此就完成了使用select的多路转接模式

select的缺陷

Select多路转接的优缺点

通过上面的实现,可以看出select多路转接的优点:

  1. 跨平台:select接口在各种操作系统上都有实现,包括Windows、Linux、macOS等
  2. 简单易用:相比于其他IO模型,select的使用相对简单
  3. 可以同时监听多个文件描述符,提高了IO效率

但是select也存在一些缺点:

  1. 文件描述符数量限制:select能够监听的文件描述符数量有限,无法处理大量文件描述符的情况
  2. 性能问题:每次调用select都需要将文件描述符集从用户态拷贝到内核态,当文件描述符数量较多时,这个开销会比较大
  3. 遍历开销:select返回后,需要遍历所有文件描述符来找出就绪的文件描述符,时间复杂度为\(O(N)\)

poll实现多路转接

上面已经详细介绍了如何使用select实现多路转接从而保证只有一个进程的TCP服务器可以接收来自多个客户端的请求。但是从上面的缺点来看select还是非常明显的,尤其是下面的两个问题:

  1. select中设置需要关心的文件描述符集的参数以及时间都是输入输出参数,导致需要在每一次循环中都要重复设置
  2. select可以关心的文件描述符集的大小是有限的,而且大小无法由程序员在代码中指定

基于这两个问题,下面讨论poll的解决方案

在Linux中,除了可以使用select实现多路转接外,还可以通过poll接口实现多路转接,其原型如下:

C++
1
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

这个接口一共三个参数。首先介绍最后一个参数,与select类似,poll也可以决定是否需要时阻塞式等待,但是timeout此时并不是struct timeval结构体类型,所以其表示方式有所不同:

  1. timeout值大于0,代表最大超时时间,单位为毫秒
  2. timeout值等于0,代表非阻塞式
  3. timeout值小于0,代表阻塞式等待

接着看剩下的两个参数,其中第一个参数表示元素为struct pollfd类型的数组,因为在C语言里,数组并不会以完整拷贝的方式传递给函数,而是只传递数组首元素的地址,这也被称为数组退化(或衰减)为指针。基于这个原因,为了保证可以访问到元素为struct pollfd的数组,还需要第二个参数,表示当前数组中元素的个数。通过这两个参数,poll接口就可以实现关心多个文件描述符,而因为数组长度是由程序员自己指定的,所以就可以解决select的第二个问题

既然poll可以关心多个文件描述符,那么如何关心每一个文件描述符的事件呢?这里就需要了解一下struct pollfd结构体的底层设计,其定义如下:

C++
1
2
3
4
5
6
struct pollfd 
{
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

其中,第一个字段表示需要关心的文件描述符,第二个字段表示对应文件描述符需要关心的事件类型,第三个字段表示关心的事件是否就绪。对于前两个字段来说,需要由程序员自己指定,而对于最后一个字段来说,由poll接口执行时填写,这就实现了关心事件和结果分离,即输入和输出分离,这一点就解决了select的第一个问题。那么如何表示具体要关心的事件呢?eventsrevents可以填写下面的值:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#define POLLIN     0x001    /* 有数据可读 */
#define POLLPRI    0x002    /* 有紧急数据可读 */
#define POLLOUT    0x004    /* 写数据不会导致阻塞 */
#define POLLERR    0x008    /* 发生错误 */
#define POLLHUP    0x010    /* 连接挂起 */
#define POLLNVAL   0x020    /* 无效请求: fd未打开 */
#define POLLRDNORM 0x040    /* 有普通数据可读 */
#define POLLRDBAND 0x080    /* 有优先数据可读 */
#define POLLWRNORM 0x100    /* 可写普通数据 */
#define POLLWRBAND 0x200    /* 可写优先数据 */

可以看到每一种事件类型都是一个宏值,这也就意味着,如果要设置对应的事件到events中,就需要通过位操作。所以实际上,eventsrevents也是位图。但是,与select不同的是,如果要设置events值需要程序员自己通过位操作来设置。对应的,要检测revents中的事件同样需要程序员自己来检测。虽然这两步都需要程序员自己做,但是实际上这两个步骤很简单:

  1. 设置时,将events和需要设置的事件进行按位或|操作,可以进行多个按位或绑定多个事件
  2. 检测时,将revents和需要检测的事件进行按位与&操作,判断结果是否大于0

介绍完poll接口的参数后,剩下的就是poll的返回值,poll的返回值和select基本一致:

  1. 返回值大于0,代表有对应值个文件描述符就绪
  2. 返回值等于0,代表没有任何一个文件描述符就绪,等待超时
  3. 返回值小于0(-1),代表等待失败

了解完poll接口后,下面基于poll接口改造前面的基于select的TCP服务器,因为改造思路相对简单,所以此处不过多介绍。本次为了演示,直接使用定长数组,实际上可以使用动态开辟数组空间实现,此处不具体实现该方案。代码修改如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// struct pollfd数组大小
const int g_fd_array_num = 1024;

class pollServer
{
public:
    pollServer(uint16_t port = default_port)
        : bs_(std::make_shared<TcpSocket>()), isRunning_(false), fd_array_(std::make_shared<std::array<struct pollfd, g_fd_array_num>>())
    {
        bs_->initSocket();
        // fd全部初始化为-1
        for (int i = 0; i < fd_array_->size(); i++)
        {
            (*fd_array_)[i].fd = -1;
            (*fd_array_)[i].events = 0;
            (*fd_array_)[i].revents = 0;
        }

        // 单独处理_listen_socketfd
        (*fd_array_)[0].fd = bs_->getListenSocketFd();
        // 监听读事件
        (*fd_array_)[0].events |= POLLIN;
    }

private:
    std::shared_ptr<BaseSocket> bs_;
    std::shared_ptr<std::array<struct pollfd, g_fd_array_num>> fd_array_;
    bool isRunning_;
};
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void startServer()
{
    isRunning_ = true;
    while (isRunning_)
    {
        // 不再需要在循环中多次设置监听套接字到读文件描述符集中
        // 等待1秒
        const int timeout = 1000;
        // 只关心读事件的文件描述符集
        int n = poll(fd_array_->data(), g_fd_array_num, timeout);
        if (n > 0)
        {
            // 监听套接字文件描述符准备完毕
            handler();
        }
        else if (n == 0)
        {
            LOG(LogLevel::INFO) << "没有客户端连接";
            sleep(1);
        }
    }
    isRunning_ = false;
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void handler()
{
    for (auto &pfd : *fd_array_)
    {
        if (pfd.fd != -1)
        {
            if(pfd.fd == bs_->getListenSocketFd())
            {
                // 检测监听套接字是否就绪
                if (pfd.revents & POLLIN)
                {
                    // 存在即可开始获取
                    SockAddrIn client;
                    int ac_socketfd = bs_->toAccept(std::addressof(client));

                    LOG(LogLevel::DEBUG) << "当前接收到了客户端:" << client.getIp() << ":" << client.getPort() << "文件描述符为:" << ac_socketfd;

                    // 将获取到的新ac_socketfd添加到辅助数组
                    for (auto &f : *fd_array_)
                    {
                        if (f.fd == -1)
                        {
                            f.fd = ac_socketfd;
                            f.events |= POLLIN;
                            break;
                        }
                    }
                }
            }
            else 
            {
                // 检测其他套接字是否就绪
                if (pfd.revents & POLLIN)
                {
                    // 说明是用于读取的套接字
                    std::string buffer;
                    ssize_t n = bs_->recvData(buffer, pfd.fd);

                    if(n > 0)
                        LOG(LogLevel::INFO) << "客户端发送了:" << buffer;
                    else if(n == 0)
                    {
                        LOG(LogLevel::INFO) << "客户端退出";
                        pfd.fd = -1;
                        pfd.events = pfd.revents = 0;

                    }
                    else
                    {
                        LOG(LogLevel::INFO) << "错误";
                        pfd.fd = -1;
                        pfd.events = pfd.revents = 0;
                    }
                }
            }
        }
    }
}

poll的缺陷

虽然epoll的确解决了select存在的问题,但是epoll底层还是需要大量的遍历,例如遍历struct pollfd数组