跳转至

epoll实现多路转接

约 8373 个字 911 行代码 2 张图片 预计阅读时间 39 分钟

本篇介绍

在前面一节已经介绍了selectpoll实现多路转接,但是selectpoll都存在一些缺陷。而二者最大的缺陷就是都需要内核涉及到遍历操作。所以,为了尽可能减少内核的遍历,就需要用到epoll实现多路转接

epoll接口介绍

要使用epoll实现多路转接,需要经过三个步骤:

  1. 创建epoll模型
  2. 设置需要关心的文件描述符和对应事件
  3. 注册关心的文件描述符和事件

根据这三个步骤,分别使用三个不同的接口:

创建epoll模型

在Linux中,要使用epoll实现多路转接,需要使用epoll_create函数创建一个epoll模型,该函数声明如下:

C
1
int epoll_create(int size);

尽管该函数存在一个参数,但是在2.6.8内核版本后,该参数已经被忽略,并且内核会使用一个默认值来代替,所以在使用时,该参数可以设置为任意值,一般设置为128或者256

Note

需要注意,尽管size可以设置为任意值,但是必须要保证size大于0

该函数会返回创建的epoll模型对应的文件描述符

设置需要关心的文件描述符和对应事件

调用epoll_create函数创建好epoll模型后,需要使用epoll_ctl函数将需要关心的文件描述符和对应的事件添加到epoll模型中,该函数声明如下:

C
1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
该函数的第一个参数表示目标epoll模型对应的文件描述符,第二个参数表示操作类型,该参数有三种类型:

  • EPOLL_CTL_ADD:将指定的文件描述符添加到epoll模型中
  • EPOLL_CTL_MOD:修改指定文件描述符对应的事件
  • EPOLL_CTL_DEL:将指定文件描述符从指定的epoll模型中删除

第三个参数表示需要关心的文件描述符,第四个参数表示需要关心的文件描述符对应的事件,该结构定义如下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
union epoll_data 
{
    void     *ptr;
    int       fd;
    uint32_t  u32;
    uint64_t  u64;
};

typedef union epoll_data  epoll_data_t;

struct epoll_event 
{
    uint32_t      events;  /* Epoll events */
    epoll_data_t  data;    /* User data variable */
};

epoll_event结构中,第一个字段表示文件描述符对应的事件,有下面的几种类型:

  • EPOLLIN:表示关心文件描述符的读事件
  • EPOLLOUT:表示关心文件描述符的写事件
  • EPOLLET:表示以边沿触发的方式进行事件通知
  • EPOLLONESHOT:表示文件描述符只能触发一次事件
  • EPOLLRDHUP:表示文件描述符对应的连接被对方关闭
  • EPOLLPRI:表示文件描述符对应的连接有紧急数据可读
  • EPOLLERR:表示文件描述符对应的连接发生错误
  • EPOLLHUP:表示文件描述符对应的连接被挂断

第二个字段表示文件描述符对应的用户数据,一般使用fd字段来表示文件描述符

注册关心的文件描述符和事件

在调用epoll_ctl函数将需要关心的文件描述符和对应的事件添加到epoll模型中后,就可以使用epoll_wait函数来等待事件的发生,该函数声明如下:

C
1
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

该函数的第一个参数表示目标epoll模型对应的文件描述符,最后一个参数的设置与poll一致,此处不再赘述。接着,第二个参数和第三个参数共同表示一个数组,与poll类似,struct epoll_event *events表示数组的第一个元素的地址,maxevents表示数组的元素个数。但是需要注意的是,这里的第二个参数并不是输入型参数,而是一个输出型参数,该数组中存储的是对应事件已经就绪的文件描述符,而因为事件可能不止一个,也有可能有很多个,所以需要maxevents来控制一次最多可以获取到的事件个数

该函数的返回值与poll一样,因为返回值大于0决定了具体就绪的文件描述符的个数,所以遍历就绪文件描述符数组events时需要用到该返回值

从接口层面对比epollselectpoll

根据上面的接口介绍,可以看到epollselectpoll最大的区别就在于接口的个数上,epoll只有三个接口,而selectpoll根据前面的使用只有一个接口,而epoll三个接口中的后两个分别表示不同的功能:「用户需要内核关心的文件描述符和对应的事件」以及「内核告诉用户有哪些事件已经就绪」,所以从接口层面来看,epoll将这两个操作分离,使得操作变得更加清晰

epoll 原理

虽然上面已经对epoll实现多路转接需要用到的接口进行了介绍,但是其中还涉及到一些更加细节的问题无法通过接口的声明来描述,所以除了需要知道epoll需要用的到的接口外,还需要了解epoll的实现原理

在前面不论是编写UDP服务器和编写TCP服务器第一步都需要创建套接字,而这个套接字本质还是一个文件描述符,那么文件描述符是如何与套接字产生关联的?

为了解决这个问题,首先需要看struct file结构,该结构定义如下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct file {
    union {
        struct list_head    fu_list;
        struct rcu_head     fu_rcuhead;
    } f_u;
    struct dentry       *f_dentry;
    struct vfsmount         *f_vfsmnt;
    const struct file_operations    *f_op;
    atomic_t        f_count;
    unsigned int        f_flags;
    mode_t          f_mode;
    loff_t          f_pos;
    struct fown_struct  f_owner;
    unsigned int        f_uid, f_gid;
    struct file_ra_state    f_ra;

    unsigned long       f_version;
    void            *f_security;

    /* needed for tty driver, and maybe others */
    void            *private_data;

#ifdef CONFIG_EPOLL
    /* Used by fs/eventpoll.c to link all the hooks to this file */
    struct list_head    f_ep_links;
    spinlock_t      f_ep_lock;
#endif /* #ifdef CONFIG_EPOLL */
    struct address_space    *f_mapping;
};

在该结构中有一个指针private_data,这个指针的类型是void *,所以可以指向任意类型的数据。在网络部分,一旦创建了套接字,那么就会创建一个struct socket结构,该结构定义如下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
struct socket {
    socket_state        state;
    unsigned long       flags;
    const struct proto_ops  *ops;
    struct fasync_struct    *fasync_list;
    struct file     *file;
    struct sock     *sk;
    wait_queue_head_t   wait;
    short           type;
};

而要实现文件描述符与套接字的关联,就需要将struct socket结构体对象的地址赋值给struct file结构的private_data指针

另外,在struct socket结构中,存在着一个成员struct sock *sk,这个成员表示具体的某一个套接字类型,既可以是UDP套接字也可以是TCP套接字,该类型的定义如下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
struct sock 
{
    struct sock_common  __sk_common;
#define sk_family       __sk_common.skc_family
#define sk_state        __sk_common.skc_state
#define sk_reuse        __sk_common.skc_reuse
#define sk_bound_dev_if     __sk_common.skc_bound_dev_if
#define sk_node         __sk_common.skc_node
#define sk_bind_node        __sk_common.skc_bind_node
#define sk_refcnt       __sk_common.skc_refcnt
#define sk_hash         __sk_common.skc_hash
#define sk_prot         __sk_common.skc_prot
    unsigned char       sk_shutdown : 2,
                sk_no_check : 2,
                sk_userlocks : 4;
    unsigned char       sk_protocol;
    unsigned short      sk_type;
    int         sk_rcvbuf;
    socket_lock_t       sk_lock;
    wait_queue_head_t   *sk_sleep;
    struct dst_entry    *sk_dst_cache;
    struct xfrm_policy  *sk_policy[2];
    rwlock_t        sk_dst_lock;
    atomic_t        sk_rmem_alloc;
    atomic_t        sk_wmem_alloc;
    atomic_t        sk_omem_alloc;
    struct sk_buff_head sk_receive_queue;
    struct sk_buff_head sk_write_queue;
    struct sk_buff_head sk_async_wait_queue;
    int         sk_wmem_queued;
    int         sk_forward_alloc;
    gfp_t           sk_allocation;
    int         sk_sndbuf;
    int         sk_route_caps;
    int         sk_gso_type;
    int         sk_rcvlowat;
    unsigned long       sk_flags;
    unsigned long           sk_lingertime;

    struct {
        struct sk_buff *head;
        struct sk_buff *tail;
    } sk_backlog;
    struct sk_buff_head sk_error_queue;
    struct proto        *sk_prot_creator;
    rwlock_t        sk_callback_lock;
    int         sk_err,
                sk_err_soft;
    unsigned short      sk_ack_backlog;
    unsigned short      sk_max_ack_backlog;
    __u32           sk_priority;
    struct ucred        sk_peercred;
    long            sk_rcvtimeo;
    long            sk_sndtimeo;
    struct sk_filter        *sk_filter;
    void            *sk_protinfo;
    struct timer_list   sk_timer;
    struct timeval      sk_stamp;
    struct socket       *sk_socket;
    void            *sk_user_data;
    struct page     *sk_sndmsg_page;
    struct sk_buff      *sk_send_head;
    __u32           sk_sndmsg_off;
    int         sk_write_pending;
    void            *sk_security;
    void            (*sk_state_change)(struct sock *sk);
    void            (*sk_data_ready)(struct sock *sk, int bytes);
    void            (*sk_write_space)(struct sock *sk);
    void            (*sk_error_report)(struct sock *sk);
    int         (*sk_backlog_rcv)(struct sock *sk,
                          struct sk_buff *skb);  
    void                    (*sk_destruct)(struct sock *sk);
};

在这个结构中就存在着UDP和TCP需要用到的缓冲区成员

如果看得到UDP套接字和TCP套接字的相关结构定义:

C
1
2
3
4
5
6
struct udp_sock 
{
    /* inet_sock has to be the first member */
    struct inet_sock inet;
    // ...
};
C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct inet_connection_sock 
{
    /* inet_sock has to be the first member! */
    struct inet_sock      icsk_inet;
    // ...
};

struct tcp_sock 
{
    /* inet_connection_sock has to be the first member of tcp_sock */
    struct inet_connection_sock inet_conn;
    // ...
}

可以发现,在tcp_sock结构和udp_sock中都存在着一个成员struct inet_sock inet,在这个类型中:

C
1
2
3
4
5
struct inet_sock {
    /* sk and pinet6 has to be the first two members of inet_sock */
    struct sock     sk;
    // ...
};

第一个成员就是struct sock类型,所以可以得出下图:

这就是在网络套接字部分实现的继承和多态,这一点在前面操作系统管理System V标准中三种资源的方式也有类似的结构形式

知道了文件描述符如何与套接字进行关联后,接着看epoll实现多路转接的原理:

首先用户调用epoll_create函数创建epoll模型,该函数会在内核中创建一个struct eventpoll类型的对象,该对象中包含一个struct rb_root类型的对象,该对象是一个红黑树,该红黑树的节点类型是struct epitem。这两个结构定义分别如下:

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct eventpoll 
{
    /* Protect the this structure access */
    rwlock_t lock;

    /*
    * This semaphore is used to ensure that files are not removed
    * while epoll is using them. This is read-held during the event
    * collection loop and it is write-held during the file cleanup
    * path, the epoll file exit code and the ctl operations.
    */
    struct rw_semaphore sem;

    /* Wait queue used by sys_epoll_wait() */
    wait_queue_head_t wq;

    /* Wait queue used by file->poll() */
    wait_queue_head_t poll_wait;

    /* List of ready file descriptors */
    struct list_head rdllist;

    /* RB-Tree root used to store monitored fd structs */
    struct rb_root rbr;
};
C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct epitem {
    /* RB-Tree node used to link this structure to the eventpoll rb-tree */
    struct rb_node rbn;

    /* List header used to link this structure to the eventpoll ready list */
    struct list_head rdllink;

    /* The file descriptor information this item refers to */
    struct epoll_filefd ffd;

    /* Number of active wait queue attached to poll operations */
    int nwait;

    /* List containing poll wait queues */
    struct list_head pwqlist;

    /* The "container" of this item */
    struct eventpoll *ep;

    /* The structure that describe the interested events and the source fd */
    struct epoll_event event;

    /*
    * Used to keep track of the usage count of the structure. This avoids
    * that the structure will desappear from underneath our processing.
    */
    atomic_t usecnt;

    /* List header used to link this item to the "struct file" items list */
    struct list_head fllink;

    /* List header used to link the item to the transfer list */
    struct list_head txlink;

    /*
    * This is used during the collection/transfer of events to userspace
    * to pin items empty events set.
    */
    unsigned int revents;
};

/* Wrapper struct used by poll queueing */
struct ep_pqueue {
    poll_table pt;
    struct epitem *epi;
};

struct eventpoll结构中,除了有红黑树结构以外,还存在着struct list_head rdllist;,这个链表中存储的就是就绪的文件描述符,而在struct epitem结构中,struct epoll_filefd ffd;表示的就是具体的文件描述符,而struct epoll_event event;表示的就是文件描述符对应的事件

结合前面的三个接口看具体的原理就是首先调用epoll_create函数创建epoll模型,接着调用epoll_ctl函数将需要关心的文件描述符和对应的事件添加到epoll模型中,然后调用epoll_wait函数等待事件的发生,当事件发生后,epoll模型会将对应的文件描述符添加到rdllist链表中,然后epoll_wait函数会返回,用户就可以遍历rdllist链表来获取就绪的文件描述符

但是,这里还涉及到一个问题,就是epoll_wait函数是如何知道事件已经就绪的?这里实际上就是利用到了底层回调机制。所谓底层回调机制,可以理解为内核先准备一个函数指针,但是这个指针一开始指向为空,当存在一个事件就绪时,该指针就会指向一个具体的函数,接着,内核会调用该函数执行其中的函数体,在此处可以简单理解为函数体就是将挂在红黑树上的文件描述符节点添加到rdllist链表中

整个过程示意图如下:

分析完上面的基本原理,下面思考几个问题:

  1. epoll模型中使用到的红黑树结构相当于selectpoll模型中的什么结构?
  2. 红黑树本质就是key-value模型,那么什么元素作为key
  3. 为什么epoll_create函数返回的是一个文件描述符?
  4. epoll模型为什么比selectpoll模型更加高效?

基于上面的问题,下面给出答案:

  1. epoll模型中使用到的红黑树结构相当于selectpoll模型中的辅助数组,因为本质都是保存着用户需要关心的文件描述符
  2. 实际上key就是文件描述符,通过文件你描述符可以快速找到具体的一个红黑树节点,查找效率高
  3. 在Linux中一切皆文件,而在struct file中存在一个private_data指针,这个指针在epoll模型中指向struct eventpoll结构
  4. 首先,从内核查找用户需要关系的文件描述符时,epoll模型只需要查找红黑树,而selectpoll模型需要遍历整个数组,这个时间消耗上比纯数组要低。其次,当有事件就绪时,对应的红黑树节点会被直接添加到rdlist中,这就可以避免了selectpoll模型中需要遍历整个数组的过程,而用户在调用epoll_wait函数时获取到的struct epoll_event *events数组一定是包含着就绪的文件描述符的,此时用户就需要关心这个数组中的数据即可。所以,epoll模型比selectpoll模型更加高效

epoll实现多路转接

上面已经基本介绍了epoll的接口和实现原理,下面结合上面的介绍对前面通过poll实现的TCP服务器进行修改:

首先是初始化部分,因为需要创建epoll模型,所以可以考虑在epollServer的构造函数中进行创建。因为epoll_create函数会返回一个文件描述符表示epoll模型,而且在后面的接口中也会使用到这个文件描述符,且该文件描述符只需要1份,所以考虑创建一个成员变量用于保存该文件描述符:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class EpollServer
{
public:
    EpollServer(uint16_t port = default_port)
        : bs_(std::make_shared<TcpSocket>()), isRunning_(false), efd_(-1)
    {
        bs_->initSocket();
        // 创建epoll模型
        efd_ = epoll_create(256);
    }

private:
    // ...
    int efd_;
};

接着,因为服务器需要监听,所以需要在构造函数中将监听套接字和对应的写事件添加到epoll模型中:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
EpollServer(uint16_t port = default_port)
    : bs_(std::make_shared<TcpSocket>(port)), isRunning_(false), efd_(-1)
{
    // ...
    // 关心listen_socketfd
    struct epoll_event ee;
    // 关心读事件
    ee.events = EPOLLIN;
    ee.data.fd = bs_->getListenSocketFd();
    int ret = epoll_ctl(efd_, EPOLL_CTL_ADD, bs_->getListenSocketFd(), &ee);
}

启动服务器时,需要epoll模型进行等待,所以在startServer函数中将原来的poll接口替换为epoll_wait接口,另外,本次考虑使用间隔1秒的方式进行等待,所以还需要设置epoll_wait函数的超时时间:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void startServer()
{
    isRunning_ = true;
    while (isRunning_)
    {
        // 不再需要在循环中多次设置监听套接字到读文件描述符集中
        // 等待1秒
        const int timeout = 1000;
        // 只关心读事件的文件描述符集
        int n = epoll_wait(efd_, fd_array_->data(), g_fd_array_num, timeout);
        if (n > 0)
        {
            // 监听套接字文件描述符准备完毕
            handler(n);
        }
        else if (n == 0)
        {
            LOG(LogLevel::INFO) << "没有客户端连接";
            sleep(1);
        }
    }
    isRunning_ = false;
}

此时,一旦监听套接字就绪,就说明有对应的客户端进行了连接,此时调用toAccept函数获取到对应的ac_socketfd,并将其添加到epoll模型中,但是epoll模型中可能不只有一个文件描述符和对应的事件,所以需要遍历epoll_wait接口返回的数组,这里因为内核添加内容是按照数组从0下标开始填充,所以可以按照正常遍历数组的方式进行,判断当前是否是监听套接字,如果是监听套接字就调用toAccept函数进行处理,否则就调用recvData函数进行处理:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void handler(int n)
{
    for (int i = 0; i < n; i++)
    {
        // 保证一定是就绪事件
        // 判断当前是否是监听套接字
        if((*fd_array_)[i].data.fd == bs_->getListenSocketFd())
        {
            // 获取链接
            SockAddrIn client;
            int ac_socketfd = bs_->toAccept(std::addressof(client));

            if(ac_socketfd > 0)
            {
                // 添加到epoll模型中
                struct epoll_event ev;
                ev.events = EPOLLIN;
                ev.data.fd = ac_socketfd;
                int ret = epoll_ctl(efd_, EPOLL_CTL_ADD, ac_socketfd, &ev);
                if (ret < 0)
                {
                    LOG(LogLevel::WARNING) << "添加事件失败";
                    exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
                }
            }
        }
        else 
        {
            // 不是监听套接字
            std::string buffer;
            ssize_t ret = bs_->recvData(buffer, (*fd_array_)[i].data.fd);
            if(ret > 0)
            {
                LOG(LogLevel::INFO) << "客户端发送:" << buffer;
            }
            else if(ret == 0)
            {
                // 恢复操作
                // 从红黑树中移除指定文件描述符,此时不再需要关心事件
                int ret = epoll_ctl(efd_, EPOLL_CTL_DEL, (*fd_array_)[i].data.fd, nullptr);
                if (ret < 0)
                {
                    LOG(LogLevel::WARNING) << "移除事件失败";
                    exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
                }
            }
            else
            {
                int ret = epoll_ctl(efd_, EPOLL_CTL_DEL, (*fd_array_)[i].data.fd, nullptr);
                if (ret < 0)
                {
                    LOG(LogLevel::WARNING) << "移除事件失败";
                    exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
                }
            }
        }
    }
}

selectpoll一样,此时不可以直接读取,虽然toAccept此时不会阻塞,但是toRead函数会阻塞,所以需要现将ac_socketfd添加到epoll模型中,再下一次遍历时一旦是ac_socketfd就绪,就可以调用recvData函数进行数据读取

直接编译运行上面的代码会发现,连接客户端没有问题和接收客户端发送的消息时没有问题,但是在客户端退出时会提示:

Text Only
1
[2025-04-09 11-18-28] [WARNING] [11674] [epollServer.hpp] [103] - 移除事件失败

出现这个问题的原因是当recvData中的recv函数返回0时,会将对应的文件描述符关闭,如下面的逻辑:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ssize_t recvData(std::string &out_data, int ac_socketfd) override
{
    char buffer[4096] = {0};
    ssize_t ret = recv(ac_socketfd, buffer, sizeof(buffer), 0);
    if (ret > 0)
        out_data = buffer;
    else
        close(ac_socketfd);

    return ret;
}

一旦文件描述符被关闭,那么此时就会造成对应的文件描述符变成无效的文件描述符,而epoll_wait函数如果要操作指定的文件描述符必须要保证该文件描述符是有效的,所以此时就会出现上面的错误提示,所以考虑移除recvData函数中的close函数,此时的代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 接收数据
ssize_t recvData(std::string &out_data, int ac_socketfd) override
{
    char buffer[4096] = {0};
    ssize_t ret = recv(ac_socketfd, buffer, sizeof(buffer), 0);
    if (ret > 0)
        out_data = buffer;

    return ret;
}

接着,在调用epoll_ctl函数之后关闭文件描述符:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void handler(int n)
{
    for (int i = 0; i < n; i++)
    {
        // 保证一定是就绪事件
        // 判断当前是否是监听套接字
        // ...
        else 
        {
            // 不是监听套接字
            std::string buffer;
            ssize_t ret = bs_->recvData(buffer, (*fd_array_)[i].data.fd);
            // ...
            else if(ret == 0)
            {
                // ...
                // 在移除之后关闭文件描述符
                close((*fd_array_)[i].data.fd);
                if (ret < 0)
                {
                    LOG(LogLevel::WARNING) << "移除事件失败";
                    exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
                }
            }
            else
            {
                // ...
                // 在移除之后关闭文件描述符
                close((*fd_array_)[i].data.fd);
                if (ret < 0)
                {
                    LOG(LogLevel::WARNING) << "移除事件失败";
                    exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
                }
            }
        }
    }
}

运行上面的代码可以发现与selectpoll一样实现了多路转接

水平触发和边缘触发

epoll模型中,有两种触发方式:

  1. 水平触发(LT,Level Trigger):当文件描述符对应的事件发生时,会一直通知上层,直到上层将事件处理完毕
  2. 边缘触发(ET,Edge Trigger):当文件描述符对应的事件发生时,如果没有数据增多时,只会通知上层一次,这就导致了如果上层没有及时处理事件,那么后续的事件就会丢失,所以边缘出发会强制用户一次性读取完所有的数据

实现水平触发和边缘触发的逻辑可以理解为:

  • 水平触发(LT):当有事件就绪时,该事件对应的红黑树节点会被添加到就绪队列rdlist中,只要继续队列中存在节点就会一直通知上层,直到上层将事件处理完毕

  • 边缘触发(ET):当有事件就绪时,内核只会在刚就绪时通知一次,除非后续该文件描述符上关心的事件有数据增加。比如从无数据到有数据时触发一次EPOLLIN,之后即使缓冲区还有数据也不会再次通知

默认情况下,epoll使用的是水平触发模式。但是实际上,边缘触发模式更加高效,但是因为没有多次通知,所以要确保数据被完全读取完毕,就需要循环调用读取函数知道读到返回值为0为止,但是如果返回值为0,那么根据前面的经验,读取函数就会被阻塞,此时尽管epoll没有阻塞,但是读取函数一旦阻塞,服务器还是会卡住针对这个问题,解决方案就是将读取文件描述符设置为非阻塞。那么为什么边缘触发模式更高效?实际上,因为需要上层一次性把所有数据读取完毕,那么只要开始读取对应的接收缓冲区就可以保证越来越大,下一次服务端向客户端返回的窗口大小也会变大,从而提高了IO吞吐量,所以边缘触发模式更加高效

IO吞吐量

IO吞吐量(Input/Output Throughput)是。IO吞吐量指单位时间内系统能处理的数据量,通常以MB/s或GB/s为单位,常用于衡量系统IO性能的关键指标,特别是在高并发网络编程中。在网络编程中,它反映了服务器处理网络数据的能力

在Linux下可以使用iftop命令查看IO吞吐量,但是首先需要安装iftop工具:

Bash
1
sudo apt install iftop

再使用下面的命令查看IO吞吐量:

Bash
1
sudo iftop -i eth0 -n -P

基于边缘触发模式的epoll实现基本TCP服务器结构

基本思路

本次为了后面实现方便,首先对使用epoll实现多路转接的接口进行封装,接着,为了保证低耦合度,考虑将每一个客户端与服务端的连接设计为一个连接结构Connection的对象,这样可以保证在EpollServer看来,只有一个一个的连接对象而不是各种文件描述符。但是,除了有用于客户端和服务端进行数据通信的文件描述符外,还有监听套接字对应的文件描述符,而对于监听套接字来说,实际上其只关心读时间,所以可以考虑将监听套接字对应的文件描述符和普通的文件描述符看做一类连接结构对象,只是需要实现的方法不同

上面的问题解决了底层EpollServer和上层连接之间的关系,但是上层连接有两种情况:

  1. 文件描述符对应的是监听套接字,执行的行为是建立客户端与服务端的连接
  2. 文件描述符对应的是普通的文件描述符,执行的行为是与客户端进行数据通信

所以对于这一点,可以考虑设计两个类,一个类是Listener,表示的是监听套接字和其对应接口的封装,另一个类是IOService,表示的是普通的文件描述符和其对应接口的封装

实现Connection类基本结构

首先是Connection类,该类需要管理每一次的连接,所以需要有一个成员变量用于保存对应的文件描述符。另外,在前面不论是selectpoll还是epoll实现的TCP服务器都存在着一个问题:读操作不能保证读取到的是完整的数据。因为前面的实现中缓冲区都是一个临时变量,一旦离开了当前读取的过程就会被销毁,为了解决读取到完整的数据,就必须对上一次读取的数据进行缓存,再次读取时将该数据与上一次的数据进行拼接直到有完整的数据,所以考虑在Connection类中还需要添加in_buffer_成员,同样的再提供一个out_buffer_成员。接着,因为底层的EpollServer管理的是连接结构对象,所以为了可以看到客户端的信息,还需要提供一个用于保存客户端信息结构的成员变量,这个类型即为前面封装的SockAddrIn类,所以当前Connection类的结构如下:

C++
1
2
3
4
5
6
7
8
class Connection
{
private:
    int fd_; // 当前套接字
    std::string in_buffer_; // 读取缓冲区
    std::string out_buffer_; // 写入缓冲区
    SockAddrIn client_; // 客户端信息
};

除了上面的信息外,为了保证当前Connection类既可以表示普通的文件描述符,还可以表示监听套接字,这里可以考虑将Connection作为基类,提供三个纯虚函数由子类进行实现,分别表示读、写和异常,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class Connection
{
public:
    // ...

    // 读、写和异常纯虚函数
    virtual void recvData() = 0;
    virtual void sendData() = 0;
    virtual void handleException() = 0;

    // ...
};

接着,为了保证子类可以访问到Connection类的成员变量,这里可以将Connection类的成员变量设置为protected,这样子类就可以直接访问到父类的成员变量,如下:

C++
1
2
3
4
5
6
7
class Connection
{
// ...
// private:
protected:
// ...
};

因为EpollServer类会管理每一个Connection对象,而EpollServer类会对每一个文件描述符进行关心,但是具体关心哪种事件当前在Connection类中并没有体现,所以还需要在Connection添加一个成员变量用于表示当前Connection关心的事件,类型为uint32_t,如下:

C++
1
2
3
4
5
6
7
class Connection
{
private:
    // ...

    uint32_t events_; // 事件类型
};

同样的,可以添加一个成员变量revents表示就绪的事件:

C++
1
2
3
4
5
6
7
class Connection
{
private:
    // ...

    uint32_t revents_; // 就绪事件类型
};

接下来需要对一些成员变量进行初始化,本次考虑在Connection类的构造函数中进行初始化操作:

C++
1
2
3
Connection()
    :fd_(-1), events_(0), revents_(0)
{}

接着,提供一些设置函数,如下:

C++
1
2
3
4
5
// 设置client
void setClient(SockAddrIn&& client)
{
    client_ = client;
}
C++
1
2
3
4
5
// 设置事件
void setEvent(uint32_t events)
{
    events_ = events;
}
C++
1
2
3
4
5
// 设置就绪事件
void setRevents(uint32_t revents)
{
    revents_ = revents;
}
C++
1
2
3
4
5
// 设置文件描述符
void setFd(int fd)
{
    fd_ = fd;
}

最后,提供一些获取函数,如下:

C++
1
2
3
4
5
// 获取客户端信息
SockAddrIn getClientInfo()
{
    return client_;
}
C++
1
2
3
4
5
// 获取文件描述符
int getFd()
{
    return fd_;
}
C++
1
2
3
4
5
// 获取事件类型
uint32_t getEvents()
{
    return events_;
}
C++
1
2
3
4
5
// 获取就绪事件类型
uint32_t getRevents()
{
    return revents_;
}

实现EpollServer类基本结构

接着是EpollServer类,该类需要管理每一个Connection对象,因为EpollServer类需要对具体的描述符进行关心,而根据Connection类的设计:包含需要关心的事件,所以可以考虑在EpollServer类中创建一张哈希表存储文件描述符和Connection类对象的映射关系,本次考虑实现文件描述符和Connection类对象指针进行映射的方式如下:

C++
1
2
3
4
5
6
7
using conn_t = std::shared_ptr<Connection>;

class EpollServer
{
private:
    std::unordered_map<int, conn_t> connections_; // 文件描述符与Connection类对象指针的映射关系
};

接着,因为EpollServer类会接收epoll_wait返回的就绪事件数组,所以可以考虑在EpollServer类中创建一个数组用于存储,这里使用定长数组,在构造时初始化对应的指针,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class EpollServer
{
public:
    EpollServer()
        :revents_arr_(std::make_shared<std::array<struct epoll_event, g_default_array_num>>())
    {

    }

private:
    // ...
    std::shared_ptr<std::array<struct epoll_event, g_default_array_num>> revents_arr_; // 就绪事件数组
};

实现epoll接口封装类Epoll基本结构

因为epoll的设置接口和等待接口都会使用到epoll模型对应的文件描述符,所以考虑在Epoll类中创建一个成员变量存储该文件描述符,接着,既然是封装接口,就没有必要单独提供一个创建epoll模型的接口,所以可以考虑在Epoll类的构造函数中创建epoll模型,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class Epoll
{
public:
    Epoll()
        :epfd_(-1)
    {
        epfd_ = epoll_create(256);
        if(epfd_ < 0)
        {
            LOG(LogLevel::ERROR) << "Epoll模型创建失败";
            exit(static_cast<int>(ErrorNumber::Epoll_Create_Fail));
        }

        LOG(LogLevel::INFO) << "Epoll模型创建成功";
    }

private:
    int epfd_;
};

接着在EpollServer中添加一个成员变量指针用于表示Epoll类对象,这样在EpollServer类中就可以调用封装后的接口。在构造函数初始化列表中对该指针进行初始化:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class EpollServer
{
public:
    EpollServer()
        // ...
        ,ep_(std::make_shared<Epoll>())
    {

    }

private:
    // ...
    std::shared_ptr<Epoll> ep_; // 封装的Epoll类
};

设计Epoll类和EpollServer

在下面进行设计之前先回顾之前的思路:

在本次实现中,Epoll接口封装类是最底层的类,而EpollServer类是Epoll接口封装类的上层类,这个服务器用于处理IO,而网络服务本质都是IO,所以网络服务相关的类(客户端与服务端建立连接和客户端与服务端进行数据通信)都在EpollServer类的上层,但是为了统一EpollServer视角,利用到了Connection类的中间层

首先既然要将文件描述符添加到epoll模型中,那么首先需要的就是在Epoll类中提供添加文件描述符到epoll模型的接口,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void epollCtl(int fd, uint32_t events)
{
    struct epoll_event ev;
    ev.events = events;
    ev.data.fd = fd;
    int ret = epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);
    if(ret < 0)
    {
        LOG(LogLevel::WARNING) << "添加文件描述符和事件失败";
        exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
    }

    LOG(LogLevel::INFO) << "添加文件描述符和事件成功";
}

接着,在EpollServer类提供一个将Connection类对象添加到epoll模型中的接口,但是参数是Connection类对象的指针,因为EpollServer类中管理的是Connection类对象的指针,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 添加文件描述符和关心的事件
void insertFdAndEvents(conn_t con)
{
    auto pos = connections_.find(con->getFd());
    if(pos != connections_.end())
    {
        LOG(LogLevel::WARNING) << "插入失败,指定描述符已存在";
        exit(static_cast<int>(ErrorNumber::Epoll_Ctl_Fail));
    }

    // 将指定的文件描述符和对应的连接对象建立映射关系
    connections_.insert({con->getFd(), con});

    // 添加到Epoll模型中
    ep_->epollCtl(con->getFd(), con->getEvents());
}

因为EpollServer类需要等待每一个文件描述符就绪,所以需要Epoll提供一个等待接口,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 等待就绪事件
int epollWait(struct epoll_event* ep_arr, int maxSize, int timeout)
{
    int num = epoll_wait(epfd_, ep_arr, maxSize, timeout);

    if(num < 0)
    {
        LOG(LogLevel::WARNING) << "Epoll等待错误";
        exit(static_cast<int>(ErrorNumber::Epoll_Wait_Fail));
    }

    LOG(LogLevel::INFO) << "Epoll开始等待";

    return num;
}

接着,在EpollServer类中实现服务器启动的函数,对于EpollServer来说,其主要任务就是等待文件描述符就绪,为了后续方便添加新功能,可以考虑将等待行为抽取到一个函数loopOnce中,而启动服务器函数就是启动服务器并持续执行loopOnce函数。为了标识服务器已经启动,可以使用一个成员变量isRunning_,在构造函数中初始化为false,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class EpollServer
{
public:
    EpollServer()
        // ...
        ,isRunning_(false)
    {

    }

    // ...

    // 单次循环
    void loopOnce()
    {

    }

    // 启动服务器
    void startServer()
    {
        isRunning_ = true;
        while (isRunning_)
        {
            loopOnce();
        }

        isRunning_ = false;
    }

    ~EpollServer()
    {}

private:
    // ...
    bool isRunning_; // 服务器运行标识
};

同样,可以提供函数用于停止服务器:

C++
1
2
3
4
5
// 停止服务器
void stopServer()
{
    isRunning_ = false;
}

接着,实现单次循环函数loopOnce,该函数就是等待已经存在于epoll模型中的文件描述符,并对具体的就绪事件进行处理。参考思路:单次循环中需要调用Epoll类中的epollWait函数,该函数会返回已经就绪事件的个数和数组,遍历数组获取到每一个继续的文件描述符和对应的事件,如果返回的事件是错误事件,为了处理方便,将该返回事件修改为读写事件就绪EPOLLIN | EPOLLOUT,交给上层的读写函数处理,这一点具体作用在后面会提及,此处不过多解释。接着就是正常情况,即要么是读事件就绪,要么是写事件继续,要么就是二者依次就绪,所以需要两个判断分别处理,但是此处不能只通过判断返回的就绪事件类型是否是或者写就决定执行某一种分支,而是还要判断对应的文件描述符是否存在。在执行就绪事件对应的逻辑分支中,因为哈希表的valueConnection对象指针类型,只需要调用父类的方法即可,如果是监听套接字,那么就会执行创建连接,否则就是正常的读写。根据这个思路,需要额外实现一个函数判断当前文件描述符是否存在于哈希表中:

C++
1
2
3
4
5
// 判断文件描述符是否存在
bool checkFdIsInConnections(int fd)
{
    return connections_.find(fd) != connections_.end();
}

接着实现loopOnce函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 单次循环
void loopOnce()
{
    int timeout = 1000;
    int num = ep_->epollWait(revents_arr_->data(), g_default_array_num, timeout);
    for (int i = 0; i < num; i++)
    {
        // 获取当前文件描述符和就绪的事件
        int fd = (*revents_arr_)[i].data.fd;
        uint32_t revents = (*revents_arr_)[i].events;
        if ((revents & EPOLLERR) || (revents & EPOLLHUP))
            revents = (EPOLLIN | EPOLLOUT);
        if ((revents & EPOLLIN) && checkFdIsInConnections(fd))
        {
            // 执行读方法
            connections_[fd]->recvData();
        }
        if ((revents & EPOLLOUT) && checkFdIsInConnections(fd))
        {
            // 执行写方法
            connections_[fd]->sendData();
        }
    }
}

实现Listener类基本结构

因为下层是通过Connection类对象来管理每一个链接,所以Listener类只需要作为Connection类的子类:

C++
1
2
3
class Listener : public Connection
{
};

接着,因为Listener类用于处理客户端和服务器端的链接,所以考虑在Listener类中添加一个成员表示TCP套接字,在构造函数初始化列表中进行初始化,如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Listener : public Connection
{
public:
    Listener(uint16_t port = default_port)
        :bs_(std::make_shared<TcpSocket>(port))
    {
    }

private:
    std::shared_ptr<BaseSocket> bs_;
};

因为Listener类是Connection类的子类,所以需要重写父类的纯虚函数,但是目前不具体实现,如下:

C++
1
2
3
// 接收信息
void recvData() override
{}
C++
1
2
3
// 发送信息
void sendData() override
{}
C++
1
2
3
// 处理异常
void handleException() override
{}

接着,在构造函数中进行初始化操作,包括创建套接字、绑定地址信息和设置监听操作。另外,因为每个Connection类对象都需要用到文件描述符,所以在构造函数中还需要将监听套接字设置到当前子类对象Listener中,便于使用Connection类对象可以获取到监听套接字,如下:

C++
1
2
3
4
5
6
7
8
Listener(uint16_t port = default_port)
    :bs_(std::make_shared<TcpSocket>(port))
{
    // 初始化套接字
    bs_->initSocket();
    // 调用父类函数设置监听套接字
    setFd(bs_->getListenSocketFd());
}

设计Listener类和IOService

因为客户端和服务端建立连接本质就是服务端读取客户端的请求信息,所以Listener类只需要实现recvData函数即可,但是需要注意的是,本次实现的是边缘触发模式,虽然能确定accept函数一定不会阻塞,但是不能保证accept函数一定只读取一次,也就是说需要将accept函数当做read等读取接口看待,所以需要循环监听。另外,如果当做IO函数看待,那么必然会出现多读一次用于判断是否读取到结尾,但是这多读的一步会导致服务器阻塞,所以还需要将对应的监听套接字设置为非阻塞,所以首先需要将监听套接字设置为非阻塞

基于上面的思路,首先提供一个函数用于将文件描述符设置为非阻塞,因为这个函数在后续正常读取信息时也会用到,所以考虑将该函数放在一个工具类中:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class EpollServerUtils
{
public:
    static void setNonBlock(int fd)
    {
        // 获取文件描述符已有的模式
        int mode = fcntl(fd, F_GETFL);
        if (mode < 0)
        {
            LOG(LogLevel::WARNING) << "获取文件描述符已有模式失败";
            exit(static_cast<int>(ErrorNumber::Fcntl_Fail));
        }

        // 设置非阻塞
        fcntl(fd, F_SETFL, mode | O_NONBLOCK);
    }
};

接着,在获取到listen_socketfd之后将其设置为非阻塞,在BaseSocket类中的initSocket函数中添加如下代码:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class BaseSocket
{
public:
    // ...
    // 获取监听套接字
    virtual int getListenSocketFd() = 0;

    // 具体实现方法
    void initSocket()
    {
        // 创建套接字
        createSocket();
        // 设置非阻塞
        EpollServerUtils::setNonBlock(getListenSocketFd());
        // ...
    }
};

接着,为了拿到accept函数的错误码,可以考虑在toAccept函数的参数部分添加一个输出型参数out_errno

C++
1
2
3
4
5
6
7
8
class BaseSocket
{
public:
    // ...
    // 接收
    virtual int toAccept(SockAddrIn *client, int *out_errno) = 0;
    // ...
};
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class TcpSocket : public BaseSocket
{
public:
    // ...

    // 实现接收
    int toAccept(SockAddrIn *client, int *out_errno) override
    {
        // ...
        int ac_socketfd = accept(_listen_socketfd, reinterpret_cast<struct sockaddr *>(&peer), &length);
        // 获取accept的错误码
        *out_errno = errno;
        // ...
    }
    // ...
};

基于上面的思路,实现recvData函数基本结构:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 接收信息
void recvData() override
{
    // 循环接收
    while(true)
    {
        int rerrno = 0;
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client), &rerrno);
        // 当做IO行为处理
        if(ac_socketfd > 0)
        {
            // 正常情况,建立链接
        }
        else if(ac_socketfd < 0)
        {
            if(rerrno == EAGAIN || rerrno == EWOULDBLOCK)
            {
                LOG(LogLevel::INFO) << "accept接收结束";
                break;
            }
            else if(rerrno == EINTR)
            {
                LOG(LogLevel::INFO) << "accept被信号中断,重新接收";
                continue;
            }
            else
            {
                LOG(LogLevel::ERROR) << "accept错误";
                break;
            }
        }
    }
}

接着思考recvData函数的正常情况如何处理,在前面都是直接将指定的文件描述符添加到epoll模型中,但是在本次实现中,当前Listener类和EpollServer类之间存在一个Connection类,而EpollServer类只能看到Connection类对象,所以需要考虑将获取到的ac_socketfd封装为Connection类对象,再将该对象添加到EpollServer类中。但是,此处遇到两个问题:

  1. 如何将获取到的ac_socketfd封装为Connection类对象
  2. 如何将该对象添加到EpollServer类中

对于第一个问题,既然已经有了关于监听套接字的子类,那么自然还需要一个处理普通文件描述符的子类:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class IOService : public Connection
{
public:
    void recvData()
    {
    }

    void sendData()
    {
    }

    void handleException()
    {
    }
};

对于第二个问题,这里需要在Connection类中添加一个成员变量指针,用于表示EpollServer类对象,这样在Connection类中就可以调用封装后的接口。这里就需要考虑如何在Connection类中访问到EpollServer类,又如何在Connection类中拿到EpollServer类对象

对于第一个问题,最直接的做法就是在Connection类所在文件中包含EpollServer类所在的文件,如下:

C++
1
2
// Connection类所在文件
#include "EpollServer.h"

但是这种做法有一个弊端,就是如果在EpollServer类所在的文件中包含了Connection类所在的文件,就会出现头文件循环包含问题,所以这种做法是不可取的。所以需要考虑使用前置声明的方式,如下:

C++
1
2
// Connection类所在文件
class EpollServer;
头文件循环包含问题

Connection类所在的文件中需要用到EpollServer类,所以需要在Connection类所在的文件中添加EpollServer类的前置声明,但是注意,如果在EpollServer类所在文件中包含了Connection类所在文件,就不要在Connection类所在的文件中再添加包含EpollServer类所在的文件,防止出现头文件循环包含问题

但是,使用前置声明还会遇到一个问题:如果将前置声明放在Connection类所在的命名空间connectionModule中,会出现如下错误:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Connection类所在文件
class EpollServer;

namespace connectionModule
{
    class Connection
    {
    // ...
    protected:
        // 指向EpollServer指针
        std::weak_ptr<EpollServer> ep_svr_;
        // 报错:"connectionModule::EpollServer" is ambiguous
    };
}

对于这种情况能想到的直接方案就是将前置声明放在命名空间connectionModule中,但是这样还会出现第二个问题:当前EpollServer的声明是在Connection类所在的命名空间connectionModule,而实际上EpollServer类的声明是在命名空间epollServerModule中,如果直接使用上面的方式:std::weak_ptr<EpollServer> ep_svr_,一旦在Listener中将该指针转换为shared_ptr类型,就会发现指针类型是std::shared_ptr<connectionModule::EpollServer>,而不是std::shared_ptr<epollServerModule::EpollServer>,而作为前置声明的connectionModule::EpollServer并没有完整的实现,这就导致访问不到epollServerModule::EpollServer中的成员。所以正确的做法是,将EpollServer类的前置声明放在EpollServer类所在的命名空间中,再将该前置声明整体放在Connection类所在文件的全局,并在使用到EpollServer的地方使用指定命名空间的方式使用EpollServer。为了防止出现循环引用问题,需要使用weak_ptr而不再是shared_ptr,如下:

上面提到的循环引用问题

EpollServer中管理了多个Connection对象指针,该指针是shared_ptr类型,如果再在Connection中使用shared_ptr就会出现Connection对象与EpollServer互指,一旦Connection类对象需要析构,就需要EpollServer先析构,而EpollServer要析构就需要Connection类对象析构导致循环引用问题

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 前置声明
namespace epollServerModule
{
    class EpollServer;
}

namespace connectionModule
{
    class Connection
    {

    protected:
        // ...
        // 指向EpollServer指针
        std::weak_ptr<epollServerModule::EpollServer> ep_svr_;
        // ...
    };
}

但是,有这个指针还不够,还需要对这个指针进行初始化,为了防止忘记初始化该指针导致的空指针错误,考虑在构造函数中添加参数用于初始化该指针,如下:

C++
1
2
3
4
Connection(std::shared_ptr<epollServerModule::EpollServer> ep)
    : fd_(-1), events_(0), revents_(0), ep_svr_(ep) 
{
}

但是,ConnectionListener类的父类,所以在Listener类中的成员初始化之前需要先调用父类的构造函数初始化父类成员(除非父类构造函数是全缺省或者无参),所以需要在Listener类的构造函数的初始化列表同样添加该参数,如下:

C++
1
2
3
4
5
6
7
8
Listener(std::shared_ptr<epollServerModule::EpollServer> ep,  uint16_t port = default_port)
    : Connection(ep), bs_(std::make_shared<TcpSocket>(port))
{
    // 初始化套接字
    bs_->initSocket();
    // 调用父类函数设置监听套接字
    setFd(bs_->getListenSocketFd());
}

最后,不要遗忘还有一个子类IOService,同样,在其构造函数的初始化列表中同样添加该参数,如下:

C++
1
2
3
IOService(std::shared_ptr<epollServerModule::EpollServer> ep)
    : Connection(ep)
{}

接着,在Connection类中添加一个函数用于获取EpollServer类对象,如下:

C++
1
2
3
4
5
// 获取EpollServer指针
std::weak_ptr<epollServerModule::EpollServer> getEpollServer()
{
    return ep_svr_;
}

解决了上面的两个问题后,回到Listener类中的recvData函数编写剩下的逻辑。注意,要实现边缘触发模式一定要使用EPOLLET并且将对应的文件描述符设置为非阻塞:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 接收信息
void recvData() override
{
    // 循环接收
    while (true)
    {
        int rerrno = 0;
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client), &rerrno);
        // 当做IO行为处理
        if (ac_socketfd > 0)
        {
            // 获取EpollServer类对象
            std::shared_ptr<epollServerModule::EpollServer> ep = getEpollServer().lock();
            // 正常情况,建立链接
            // 1. 创建连接对象
            std::shared_ptr<Connection> ac_con = std::make_shared<IOService>(ep);
            // 2. 设置非阻塞
            EpollServerUtils::setNonBlock(ac_socketfd);
            ac_con->setFd(ac_socketfd);
            ac_con->setEvent(EPOLLIN | EPOLLET);
            ac_con->setClient(std::move(client));
            // 3. 通过Connection将链接指针插入到Epoll模型中
            ep->insertFdAndEvents(ac_con);
        }
        // ...
    }
}

在上面的代码中,因为getEpollServer函数返回的是weak_ptr类型,所以需要调用lock函数将其转换为shared_ptr类型再使用。这里是临时提升为shared_ptr类型,所以使用完之后会自动释放,主要原因如下:getEpollServer持有的指针是对EpollServer类对象的弱引用,一旦ep变量离开作用域,哪怕ep变量是shared_ptr类型对EpollServer类对象的强引用,其引用计数器也不会等到EpollServer销毁再减1

最后,需要修改前面TcpSocket类中的toAccept函数,该函数中存在对ac_socketfd < 0的判断逻辑,但是这个逻辑已经在loopOnce函数处理了,所以需要删除toAccept函数中关于这部分的逻辑:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 实现接收
int toAccept(SockAddrIn *client, int *out_errno) override
{
    // ...
    // 获取accept的错误码
    *out_errno = errno;

    // 删除下面的逻辑——起始
    if (ac_socketfd < 0)
    {
        LOG(LogLevel::ERROR) << "接收失败:" << strerror(errno);
        exit(static_cast<int>(ErrorNumber::AcceptFail));
    }
    // 结束

    // ...
}

第一阶段测试

首先,在IOService类中的recvData函数中添加一条日志:

C++
1
2
3
4
void recvData()
{
    LOG(LogLevel::DEBUG) << "进入IOService读模块";
}

接着,创建一个主函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include "Listener.hpp"
#include "EpollServer.hpp"

using namespace listenerModule;
using namespace epollServerModule;

int main()
{
    std::shared_ptr<epollServerModule::EpollServer> e_svr = std::make_shared<epollServerModule::EpollServer>();
    // 创建Listener代表启动服务器
    // 将Listener套接字构建为Connection对象
    std::shared_ptr<Connection> con = std::make_shared<Listener>(e_svr);
    con->setFd(con->getFd());
    // 使用EPOLLET开启边缘触发模式
    con->setEvent(EPOLLIN | EPOLLET);
    // 将Connection对象插入到Epoll模型中
    e_svr->insertFdAndEvents(con);
    // 启动服务器
    e_svr->startServer();

    return 0;
}

编译运行上面的代码使用一个客户端连接就可以发现一个进程就可以处理多个连接,并且只要客户端向服务端发送内容就会打印类似下面的内容:

Text Only
1
[2025-04-11 17-07-16] [DEBUG] [31041] [IOService.hpp] [20] - 进入IOService读模块

当前阶段已经完成了任务派发,但是还没有进行IO处理,下一节将继续完成IO处理