跳转至

Reactor模式与完善基于边缘触发模式epoll实现TCP服务器

约 4334 个字 555 行代码 1 张图片 预计阅读时间 21 分钟

Reactor模式(反应堆模式)

在上面的代码中已经实现了TCP服务器的下层框架,整体结构如下图所示:

在上面的结构中,作为Connection类子类的Listener类一般被称为连接管理器,IOService类一般被称为IO处理器,而EpollServer类即为事件派发器,也被称为Reactor模式

了解了何为Reactor模式后,下面就是继续完善上面的TCP服务器,因为上面的服务器还没有完成IO逻辑

完善TCP服务器的IO服务

数据准备

既然要做IO处理,那么少不了的就是定制协议,这样才可以尽可能保障客户端和服务端之间通信,本次考虑使用前面在序列化和反序列化与网络计算器封装的协议:Response类和Request

完善recvData函数

接着完善IOService类中的recvData函数。考虑下面的思路:

既然是边缘触发模式,那么读取一定是持续读,所以需要死循环读取客户端发送的数据,得到结果后就需要对返回值进行判断,与前面toAccept函数的处理逻辑类似。对于读取异常的情况考虑将异常处理交给handleException函数,代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void recvData()
{
    while (true)
    {
        char buffer[1024] = {0};
        ssize_t num = recv(getFd(), buffer, sizeof(buffer) - 1, 0);

        if(num > 0)
        {
            // 正常读取到数据
        }
        else if(num == 0)
        {
            // 读取到结尾
            handleException();
            break;
        }
        else 
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                LOG(LogLevel::INFO) << "数据未准备完成,结束读取";
                break;
            }
            else if(errno == EINTR)
            {
                LOG(LogLevel::INFO) << "读取被信号中断,继续读取";
                continue;
            }
            else
            {
                // 读取出错
                handleException();
                return ;
            }
        }
    }

}

接着,考虑正常读取的情况,因为读取无法保证一次读取到所有的数据,所以需要循环读取,但是在上面的代码中,每一次循环的缓冲区buffer都是临时的,这就导致如果服务端还没有读完所有数据,那么上一次的数据就会被销毁,所以就需要用到Connection类中的in_buffer_保存当前读取到的数据,对此,在Connection类中提供一个设置in_buffer_的函数和获取in_buffer_的函数:

C++
1
2
3
4
5
// 设置in_buffer_
void setInBuffer(const std::string &in)
{
    in_buffer_ += in;
}
C++
1
2
3
4
5
// 获取in_buffer_
std::string& getInBuffer()
{
    return in_buffer_;
}

需要注意的是,对于获取函数来说,其返回值建议设置为引用版本,因为后面需要对in_buffer_进行直接修改

接着,在recvData函数的num > 0逻辑中通过setInbuffer将读取到的buffer存储到in_buffer_中,代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
void recvData()
{
    while (true)
    {
        char buffer[1024] = {0};
        ssize_t num = recv(getFd(), buffer, sizeof(buffer), 0);

        if(num > 0)
        {
            // 正常读取到数据
            setInbuffer(buffer);
        }
        // ...
    }

}

除了获取数据后还需要对数据进行处理,这里就需要开始使用协议,但是处理函数并不交给IOService类的recvData函数,而是考虑交给上层,首先在IOService类中添加一个函数对象成员,该函数对象的类型为std::string(std::string&),代码如下:

C++
1
2
3
4
5
6
7
8
using handler_t = std::function<std::string(std::string&)>;

class IOService : public Connection
{
    // ...
private:
    handler_t handleData_;
};

接着,在IOService类中提供一个设置handleData函数对象的函数,代码如下:

C++
1
2
3
4
void setHandleDataFunc(handler_t func)
{
    handleData_ = func;
}

接着,考虑何时处理数据,在上面的recvData函数中,读取结束并且还在当前函数的情况说明这一次读取结束,所以在循环结束后就可以处理数据,即:

C++
1
2
3
4
5
6
7
8
9
void recvData()
{
    // ...

    // 处理数据
    std::string ret;
    if(handleData_)
        ret = handleData_(getInbuffer());
}

假设现在handleData_函数已经处理完数据并给出了有效的结果,接下来需要考虑的就是如何将结果发送给客户端,这个问题的本质就是写入,按照前面的逻辑就是让EpollServer关心当前文件描述符的写事件,但是实际上并不是如此,因为写事件看的只是当前进程的写入缓冲区,不看对方的缓冲区,对方缓冲区为满不发送这个行为交给TCP协议去做,而在最开始当前进程的写入缓冲区一定是空的,所以写入事件一开始一定是就绪的,但是存在一直写导致写入缓冲区写满的情况,所以此时再考虑让EpollServer关心当前文件描述符的写事件。综上所述,写事件只有在写条件不满足时才进行关心

接下来考虑如何让EpollServer关心当前文件描述符的写事件,按照前面的思路就是将当前文件描述符和写事件添加到EpollServer中,但是实际上这种方式只适用于常开启的事件,很明显,对于写事件来说并不是从一开始就需要关心,所以属于按需关心,那么就不能使用前面的思路。这里就需要考虑修改当前文件描述符对应的事件,所以需要在EpollServer类中提供一个修改当前文件描述符对应的事件的函数,而修改和添加只是使用的宏不同,所以可以考虑下面的设计:

C++
1
2
3
4
5
6
7
8
9
void epollCtl(int fd, uint32_t events, int flag)
{
    struct epoll_event ev;
    ev.events = events;
    ev.data.fd = fd;

    int ret = epoll_ctl(epfd_, flag, fd, &ev);
    // ...
}
C++
1
2
3
4
void addEvents(int fd, uint32_t events)
{
    epollCtl(fd, events, EPOLL_CTL_ADD);
}
C++
1
2
3
4
void modifyEvents(int fd, uint32_t events)
{
    epollCtl(fd, events, EPOLL_CTL_MOD);
}

接着,修改相关调用位置即可:

C++
1
2
3
4
5
6
7
8
// 添加文件描述符和关心的事件
void insertFdAndEvents(conn_t con)
{
    // ...

    // 添加到Epoll模型中
    ep_->addEvents(con->getFd(), con->getEvents());
}

接着,在EpollServer类中提供一个启动和取消写关心的函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
void EnbaleToWriteAndToRead(int fd, bool toRead, bool toWrite)
{
    uint32_t events = toRead ? EPOLLIN : 0 | toWrite ? EPOLLOUT : 0 | EPOLLET;
    // 1. 修改已有的哈希表节点
    auto pos = connections_.find(fd);
    if(pos != connections_.end())
    {
        connections_[fd]->setEvent(events);
        // 2. 写入到epoll模型中
        ep_->modifyEvents(fd, events);
    }
}

所以,在recvData函数中,只需要将ret中的结果存储到out_buffer_中再交给sendData函数处理即可。基于这个思路,首先需要提供一个设置out_buffer_的函数和一个获取out_buffer_的函数:

C++
1
2
3
4
5
// 设置out_buffer_
void setOutBuffer(const std::string &out)
{
    out_buffer_ += out;
}
C++
1
2
3
4
5
// 获取out_buffer_
std::string& getOutBuffer()
{
    return out_buffer_;
}

接着完善recvData函数,代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void recvData()
{
    // ...

    // 添加结果到结果字符串中
    setOutBuffer(ret);

    if(!getOutBuffer().empty())
    {
        std::shared_ptr<epollServerModule::EpollServer> e_svr = getEpollServer().lock();
        e_svr->EnbaleToWriteAndToRead(getFd(), true, true);
    }
}

完善上层处理函数

创建一个类表示任务,其中包含一个处理静态函数:

C++
1
2
3
4
5
6
7
8
class Task
{
public:
    static std::string task_1(std::string& in)
    {

    }
};

接下来考虑的就是如何设计这个任务函数。可以考虑思路:对得到的字符串进行解码+反序列化得到有效载荷,然后根据有效载荷进行计算,最后将结果序列化+编码并返回。这里需要用到前面在序列化和反序列化与网络计算器设计的Caculator类。参考代码如下:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static std::string task_1(std::string& in)
{
    std::string json_str;
    std::string result;
    // 解码
    while(decode(in, json_str))
    {
        std::string resp_str;
        if(json_str.empty())
        {
            LOG(LogLevel::WARNING) << "解码失败";
            break;
        }

        // 反序列化
        Request req;
        if(!req.deserialize(json_str))
        {
            LOG(LogLevel::WARNING) << "反序列化失败";
            break;
        }

        Calculator c;
        Response resp = c.calculate(req);

        // 序列化
        if (!resp.serialize(resp_str))
        {
            LOG(LogLevel::WARNING) << "序列化失败";
            break;
        }

        // 编码
        if (!encode(resp_str))
        {
            LOG(LogLevel::WARNING) << "编码失败";
            break;
        }

        result += resp_str;
    }

    return result;
}

接着,在调用IOService类中的recvData函数之前先设置handleData,本次考虑在Listener类中的recvData函数中进行设置:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 接收信息
void recvData() override
{
    // 循环接收
    while (true)
    {
        int rerrno = 0;
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client), &rerrno);
        // 当做IO行为处理
        if (ac_socketfd > 0)
        {
            // ...
            // 4. 设置IOService类中的方法
            std::shared_ptr<IOService> ptr = std::dynamic_pointer_cast<IOService>(ac_con);
            ptr->setHandleDataFunc(taskModule::Task::task_1);
        }
        // ...
    }
}

完善sendData函数

sendData函数与recvData函数的逻辑基本类似,下面给出基本结构:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void sendData()
{
    while(true)
    {
        ssize_t num = send(getFd(), getOutBuffer().c_str(), getOutBuffer().size(), 0);
        if(num > 0)
        {
            // 正常发送
        }
        else if(num == 0)
        {
            // 发送完毕
            break;
        }
        else
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                LOG(LogLevel::INFO) << "缓冲区已满";
                break;
            }
            else if(errno == EINTR)
            {
                LOG(LogLevel::INFO) << "信号中断";
                continue;
            }
            else 
            {
                handleException();
                return ;
            }
        }
    }
}

接下来考虑正常发送的情况,实际上,既然是正常发送,那么说明num个数据已经被发送出去了,所以需要将out_buffer_中的数据删除,所以需要在Connection类中提供一个删除out_buffer_中数据的函数,代码如下:

C++
1
2
3
4
5
// 移除n个字符
void remove_n_data(int n)
{
    out_buffer_.erase(0, n);
}

完善sendData函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void sendData()
{
    while(true)
    {
        ssize_t num = send(getFd(), getOutBuffer().c_str(), getOutBuffer().size(), 0);
        if(num > 0)
        {
            // 正常发送
            remove_n_data(num);
        }
        // ...
    }
}

完成发送之后还需要处理一件事情:将当前文件描述符从EpollServer中移除,因为当前文件描述符对应的写事件已经处理完毕,所以不再需要关心当前文件描述符的写事件,下一次肯定也有空间写入,如果不存在空间再重新关心。但是上面的实现中有两种离开循环的情况:

  1. 发送缓冲区已满
  2. 数据发送完毕

对于第一种情况,既然是发送完毕,那么out_buffer_一定为空,所以可以直接将当前文件描述符从EpollServer中移除,对于第二种情况,依旧需要保证EpollServer对当前文件描述符写事件的关心:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void sendData()
{
    // ...

    if(getOutBuffer().empty()) // 发完数据
    {
        std::shared_ptr<epollServerModule::EpollServer> e_svr = getEpollServer().lock();
        e_svr->EnbaleToWriteAndToRead(getFd(), true, false);
    }
    else 
    {
        std::shared_ptr<epollServerModule::EpollServer> e_svr = getEpollServer().lock();
        e_svr->EnbaleToWriteAndToRead(getFd(), true, true);
    }
}

为什么recvData函数中开启写事件关心了还要在sendData函数中再关心一次?

recvData函数中开启写事件关心是因为确保下一次可以进入到loopOnce函数的写逻辑中执行sendData函数,而sendData函数中再开启写事件关心是因为可能存在写缓冲区满的情况导致无法正常写入,此时需要对写事件关心从而保证下一次可以正常写入

recvData函数中,也可以不开启写事件关心直接调用sendData函数,效果都是一样的

完善handleException函数

在前面EpollServer类中,将EPOLLERREPOLLHUP全部设置为EPOLLIN | EPOLLOUT就是为了统一处理异常,这样,不论是在recvData出异常还是在sendData出异常,都会因为关心的事件进入对应的函数从而触发对应函数中调用handleException函数的逻辑。下面完善handleException函数,考虑思路:既然是处理异常,那么说明当前文件描述符出现了问题,此时要做的就是释放资源,所以按照下面三个步骤设计handleException函数:

  1. Epoll中移除当前文件描述符
  2. 关闭当前文件描述符
  3. 从哈希表中移除当前文件描述符

根据这三个步骤,在Epoll中分别提供移除接口:

C++
1
2
3
4
5
// 移除文件描述符和事件
void deleteEvents(int fd)
{
    epollCtl(fd, 0, EPOLL_CTL_DEL);
}

接着,在Connection类中提供关闭文件描述符:

C++
1
2
3
4
5
// 关闭指定的文件描述符
void closeFd()
{
    close(fd_);
}

最后,在EpollServer类中提供从哈希表中移除当前文件描述符:

C++
1
2
3
4
5
6
7
8
// 从哈希表中移除当前文件描述符的函数
void popFdAndEventsFromConnections(int fd)
{
    if(connections_.find(fd) != connections_.end())
    {
        connections_.erase(fd);
    }
}

但是,这三个步骤实际上只能在EpollServer类中进行,所以可以考虑在调用popFdAndEventsFromConnections时执行这三个步骤:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 从哈希表中移除当前文件描述符的函数
void popFdAndEventsFromConnections(int fd)
{
    if(connections_.find(fd) != connections_.end())
    {
        // 1. 移除事件
        ep_->deleteEvents(fd);
        // 2. 关闭文件描述符
        connections_[fd]->closeFd();
        // 3. 删除文件描述符
        connections_.erase(fd);
    }
}

最后,在handleException函数中调用即可:

C++
1
2
3
4
5
6
void handleException()
{
    setHandleDataFunc(nullptr);
    std::shared_ptr<epollServerModule::EpollServer> e_svr = getEpollServer().lock();
    e_svr->popFdAndEventsFromConnections();
}

第二阶段测试

本次使用前面在序列化和反序列化与网络计算器设计的主函数测试

正常情况下不论多少个客户端连接都可以正常计算,但是一旦有个客户端断开,那么其他客户端此时再发送请求就会出现问题,控制台输出如下:

Text Only
1
2
3
4
...
[2025-04-13 12-57-52] [DEBUG] [21818] [Task.hpp] [16] - 进入任务处理
malloc(): unaligned tcache chunk detected
已中止 (核心已转储)

从上面的日志可以发现,错误出现在进入任务处理函数时,所以可以基本确定是调用了任务函数时出现的问题,结合触发条件:客户端断开连接,可以判断出错误可能是由handleException引起的,根据这两个推测,问题基本可以定位在下面的位置:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 接收信息
void recvData() override
{
    // 循环接收
    while (true)
    {
        int rerrno = 0;
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client), &rerrno);
        // 当做IO行为处理
        if (ac_socketfd > 0)
        {
            // LOG(LogLevel::DEBUG) << "准备IO套接字";
            // 获取EpollServer类对象
            std::shared_ptr<epollServerModule::EpollServer> ep = getEpollServer().lock();
            // 正常情况,建立链接
            // 1. 创建连接对象
            std::shared_ptr<Connection> ac_con = std::make_shared<IOService>(ep);
            // ...
            // 4. 设置IOService类中的方法
            std::shared_ptr<IOService> ptr = std::dynamic_pointer_cast<IOService>(ac_con);
            ptr->setHandleDataFunc(taskModule::Task::task_1);
        }
        // ...
    }
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
void recvData()
{
    //...   
    // 处理数据
    std::string ret;
    if (handleData_)
        ret = handleData_(getInBuffer());

    // 添加结果到结果字符串中
    setOutBuffer(ret);

    if(!getOutBuffer().empty())
    {
        LOG(LogLevel::DEBUG) << "recvData开启写事件关心";
        std::shared_ptr<epollServerModule::EpollServer> e_svr = getEpollServer().lock();
        e_svr->EnbaleToWriteAndToRead(getFd(), true, true);
    }
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static std::string task_1(std::string &in)
{
    LOG(LogLevel::DEBUG) << "进入任务处理";
    std::string json_str;
    std::string result;
    // 解码
    while (decode(in, json_str))
    {
        std::string resp_str;
        if (json_str.empty())
        {
            LOG(LogLevel::WARNING) << "解码失败";
            break;
        }

        // 反序列化
        Request req;
        if (!req.deserialize(json_str))
        {
            LOG(LogLevel::WARNING) << "反序列化失败";
            break;
        }

        Calculator c;
        Response resp = c.calculate(req);

        // 序列化
        if (!resp.serialize(resp_str))
        {
            LOG(LogLevel::WARNING) << "序列化失败";
            break;
        }

        // 编码
        if (!encode(resp_str))
        {
            LOG(LogLevel::WARNING) << "编码失败";
            break;
        }

        result += resp_str;
    }

    LOG(LogLevel::DEBUG) << "任务处理完成";
    LOG(LogLevel::DEBUG) << result;

    return result;
}

但是,具体是什么问题还需要进一步分析。而malloc(): unaligned tcache chunk detected一般可能是下面几种原因:

  1. 缓冲区溢出:写入超过分配的内存边界
  2. 野指针操作:访问已释放的内存区域
  3. 错误的指针运算:如对指针进行不正确的类型转换或算术运算
  4. 多线程竞争:未加锁的并发内存操作

而在本次实现中,只有第一种情况和第二种情况,所以需要进一步分析,这里可以使用AddressSanitizer(ASan)内存检测工具,这个工具可以检测以下问题:

  1. 缓冲区溢出(堆/栈/全局变量)
  2. 使用释放后的内存(use-after-free)
  3. 双重释放(double-free)
  4. 内存泄漏(memory leaks)

在编译时添加-fsanitize=address即可开启ASan检测,再带上-g可以看到更多的信息,例如符号表、源代码行号等,例如本次编译指令为:

Bash
1
g++ -o main main.cc -lpthread -ljsoncpp -fsanitize=address -g

重新编译上面的代码,运行程序并开启一个客户端让其断开,可以看到控制台有很长的输出

ASan输出
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
[2025-04-13 13-12-59] [DEBUG] [22204] [IOService.hpp] [141] - 进入设置上层处理函数
=================================================================
==22204==ERROR: AddressSanitizer: heap-use-after-free on address 0x511000000100 at pc 0x5934f277824f bp 0x7ffd85516c30 sp 0x7ffd85516c20
READ of size 8 at 0x511000000100 thread T0

    #0 0x5934f277824e in std::_Function_base::_M_empty() const /usr/include/c++/13/bits/std_function.h:247
    #1 0x5934f278a59f in std::function<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > (std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&)>::operator bool() const /usr/include/c++/13/bits/std_function.h:574
    #2 0x5934f2782868 in IOServiceModule::IOService::recvData() /home/epsda/Codes_In_Linux/EpollServer_ET/IOService.hpp:66
    #3 0x5934f277f51c in epollServerModule::EpollServer::loopOnce() /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:78
    #4 0x5934f277f76e in epollServerModule::EpollServer::startServer() /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:97
    #5 0x5934f2777531 in main /home/epsda/Codes_In_Linux/EpollServer_ET/main.cc:21
    #6 0x7c6e95a2a1c9 in __libc_start_call_main ../sysdeps/nptl/libc_start_call_main.h:58
    #7 0x7c6e95a2a28a in __libc_start_main_impl ../csu/libc-start.c:360
    #8 0x5934f2775f74 in _start (/home/epsda/Codes_In_Linux/EpollServer_ET/main+0x6f74) (BuildId: f7d36700d4132470fb997f44c7b71a254a08564e)

0x511000000100 is located 192 bytes inside of 208-byte region [0x511000000040,0x511000000110)
freed by thread T0 here:
    #0 0x7c6e962ff5e8 in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:164
    #1 0x5934f2797ee3 in std::__new_allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/13/bits/new_allocator.h:172
    #2 0x5934f27959f8 in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/13/bits/alloc_traits.h:517
    #3 0x5934f27959f8 in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/13/bits/allocated_ptr.h:74
    #4 0x5934f2798f4a in std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/13/bits/shared_ptr_base.h:623
    #5 0x5934f277804d in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/13/bits/shared_ptr_base.h:347
    #6 0x5934f2786ce7 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/13/bits/shared_ptr_base.h:1071
    #7 0x5934f277e6e7 in std::__shared_ptr<connectionModule::Connection, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/13/bits/shared_ptr_base.h:1524
    #8 0x5934f277e707 in std::shared_ptr<connectionModule::Connection>::~shared_ptr() /usr/include/c++/13/bits/shared_ptr.h:175
    #9 0x5934f277e72b in std::pair<int const, std::shared_ptr<connectionModule::Connection> >::~pair() /usr/include/c++/13/bits/stl_pair.h:187
    #10 0x5934f2791e02 in void std::__new_allocator<std::__detail::_Hash_node<std::pair<int const, std::shared_ptr<connectionModule::Connection> >, false> >::destroy<std::pair<int const, std::shared_ptr<connectionModule::Connection> > >(std::pair<int const, std::shared_ptr<connectionModule::Connection> >*) /usr/include/c++/13/bits/new_allocator.h:198
    #11 0x5934f2791e02 in void std::allocator_traits<std::allocator<std::__detail::_Hash_node<std::pair<int const, std::shared_ptr<connectionModule::Connection> >, false> > >::destroy<std::pair<int const, std::shared_ptr<connectionModule::Connection> > >(std::allocator<std::__detail::_Hash_node<std::pair<int const, std::shared_ptr<connectionModule::Connection> >, false> >&, std::pair<int const, std::shared_ptr<connectionModule::Connection> >*) /usr/include/c++/13/bits/alloc_traits.h:558
    #12 0x5934f2791e02 in std::__detail::_Hashtable_alloc<std::allocator<std::__detail::_Hash_node<std::pair<int const, std::shared_ptr<connectionModule::Connection> >, false> > >::_M_deallocate_node(std::__detail::_Hash_node<std::pair<int const, std::shared_ptr<connectionModule::Connection> >, false>*) /usr/include/c++/13/bits/hashtable_policy.h:2011
    #13 0x5934f2793244 in std::_Hashtable<int, std::pair<int const, std::shared_ptr<connectionModule::Connection> >, std::allocator<std::pair<int const, std::shared_ptr<connectionModule::Connection> > >, std::__detail::_Select1st, std::equal_to<int>, std::hash<int>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_erase(unsigned long, std::__detail::_Hash_node_base*, std::__detail::_Hash_node<std::pair<int const, std::shared_ptr<connectionModule::Connection> >, false>*) /usr/include/c++/13/bits/hashtable.h:2353
    #14 0x5934f279027c in std::_Hashtable<int, std::pair<int const, std::shared_ptr<connectionModule::Connection> >, std::allocator<std::pair<int const, std::shared_ptr<connectionModule::Connection> > >, std::__detail::_Select1st, std::equal_to<int>, std::hash<int>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_erase(std::integral_constant<bool, true>, int const&) /usr/include/c++/13/bits/hashtable.h:2396
    #15 0x5934f278d2c2 in std::_Hashtable<int, std::pair<int const, std::shared_ptr<connectionModule::Connection> >, std::allocator<std::pair<int const, std::shared_ptr<connectionModule::Connection> > >, std::__detail::_Select1st, std::equal_to<int>, std::hash<int>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::erase(int const&) /usr/include/c++/13/bits/hashtable.h:984
    #16 0x5934f278a0d2 in std::unordered_map<int, std::shared_ptr<connectionModule::Connection>, std::hash<int>, std::equal_to<int>, std::allocator<std::pair<int const, std::shared_ptr<connectionModule::Connection> > > >::erase(int const&) /usr/include/c++/13/bits/unordered_map.h:770
    #17 0x5934f277fc47 in epollServerModule::EpollServer::popFdAndEventsFromConnections(int) /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:132
    #18 0x5934f27842b7 in IOServiceModule::IOService::handleException() /home/epsda/Codes_In_Linux/EpollServer_ET/IOService.hpp:136
    #19 0x5934f2782315 in IOServiceModule::IOService::recvData() /home/epsda/Codes_In_Linux/EpollServer_ET/IOService.hpp:40
    #20 0x5934f277f51c in epollServerModule::EpollServer::loopOnce() /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:78
    #21 0x5934f277f76e in epollServerModule::EpollServer::startServer() /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:97
    #22 0x5934f2777531 in main /home/epsda/Codes_In_Linux/EpollServer_ET/main.cc:21
    #23 0x7c6e95a2a1c9 in __libc_start_call_main ../sysdeps/nptl/libc_start_call_main.h:58
    #24 0x7c6e95a2a28a in __libc_start_main_impl ../csu/libc-start.c:360
    #25 0x5934f2775f74 in _start (/home/epsda/Codes_In_Linux/EpollServer_ET/main+0x6f74) (BuildId: f7d36700d4132470fb997f44c7b71a254a08564e)

previously allocated by thread T0 here:
    #0 0x7c6e962fe548 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:95
    #1 0x5934f2797e8b in std::__new_allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) /usr/include/c++/13/bits/new_allocator.h:151
    #2 0x5934f2795941 in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) /usr/include/c++/13/bits/alloc_traits.h:482
    #3 0x5934f2795941 in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<IOServiceModule::IOService, std::allocator<void>, (__gnu_cxx::_Lock_policy)2> >&) /usr/include/c++/13/bits/allocated_ptr.h:98
    #4 0x5934f27938ae in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<IOServiceModule::IOService, std::allocator<void>, std::shared_ptr<epollServerModule::EpollServer>&>(IOServiceModule::IOService*&, std::_Sp_alloc_shared_tag<std::allocator<void> >, std::shared_ptr<epollServerModule::EpollServer>&) /usr/include/c++/13/bits/shared_ptr_base.h:969
    #5 0x5934f2790c79 in std::__shared_ptr<IOServiceModule::IOService, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<void>, std::shared_ptr<epollServerModule::EpollServer>&>(std::_Sp_alloc_shared_tag<std::allocator<void> >, std::shared_ptr<epollServerModule::EpollServer>&) /usr/include/c++/13/bits/shared_ptr_base.h:1712
    #6 0x5934f278d9be in std::shared_ptr<IOServiceModule::IOService>::shared_ptr<std::allocator<void>, std::shared_ptr<epollServerModule::EpollServer>&>(std::_Sp_alloc_shared_tag<std::allocator<void> >, std::shared_ptr<epollServerModule::EpollServer>&) /usr/include/c++/13/bits/shared_ptr.h:464
    #7 0x5934f278ada5 in std::shared_ptr<std::enable_if<!std::is_array<IOServiceModule::IOService>::value, IOServiceModule::IOService>::type> std::make_shared<IOServiceModule::IOService, std::shared_ptr<epollServerModule::EpollServer>&>(std::shared_ptr<epollServerModule::EpollServer>&) /usr/include/c++/13/bits/shared_ptr.h:1010
    #8 0x5934f2784f65 in listenerModule::Listener::recvData() /home/epsda/Codes_In_Linux/EpollServer_ET/Listener.hpp:49
    #9 0x5934f277f51c in epollServerModule::EpollServer::loopOnce() /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:78
    #10 0x5934f277f76e in epollServerModule::EpollServer::startServer() /home/epsda/Codes_In_Linux/EpollServer_ET/EpollServer.hpp:97
    #11 0x5934f2777531 in main /home/epsda/Codes_In_Linux/EpollServer_ET/main.cc:21
    #12 0x7c6e95a2a1c9 in __libc_start_call_main ../sysdeps/nptl/libc_start_call_main.h:58
    #13 0x7c6e95a2a28a in __libc_start_main_impl ../csu/libc-start.c:360
    #14 0x5934f2775f74 in _start (/home/epsda/Codes_In_Linux/EpollServer_ET/main+0x6f74) (BuildId: f7d36700d4132470fb997f44c7b71a254a08564e)

SUMMARY: AddressSanitizer: heap-use-after-free /usr/include/c++/13/bits/std_function.h:247 in std::_Function_base::_M_empty() const
Shadow bytes around the buggy address:
0x510ffffffe80: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x510fffffff00: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x510fffffff80: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
0x511000000000: fa fa fa fa fa fa fa fa fd fd fd fd fd fd fd fd
0x511000000080: fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd fd
=>0x511000000100:[fd]fd fa fa fa fa fa fa fa fa fa fa fa fa fa fa
0x511000000180: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
0x511000000200: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
0x511000000280: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
0x511000000300: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
0x511000000380: fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa fa
Shadow byte legend (one shadow byte represents 8 application bytes):
Addressable:           00
Partially addressable: 01 02 03 04 05 06 07 
Heap left redzone:       fa
Freed heap region:       fd
Stack left redzone:      f1
Stack mid redzone:       f2
Stack right redzone:     f3
Stack after return:      f5
Stack use after scope:   f8
Global redzone:          f9
Global init order:       f6
Poisoned by user:        f7
Container overflow:      fc
Array cookie:            ac
Intra object redzone:    bb
ASan internal:           fe
Left alloca redzone:     ca
Right alloca redzone:    cb
==22204==ABORTING

根据上面的错误信息,首先看问题原因,即:AddressSanitizer: heap-use-after-free on address,可以得出结论:出现了野指针问题。接着看后面的内容可以得出问题出现在handleData_的位置出现了野指针问题

接着,再看第二部分freed by thread T0 here,这一行提示了指针是在何处进行的释放:

从当前实现代码的堆栈信息结合当前部分的前面堆栈信息可以发现问题出现在connections_移除元素的位置,尤其是出现了大量的智能指针

最后看previously allocated by thread T0 here,表示上一次开辟空间的位置,可以看到出现在ConnectionrecvData函数中,所以现在问题就可以定位在调用recvData的位置以及recvData函数中,因为这两个位置涉及到了Connection的创建和销毁

回到代码,下面看调用位置和两个子类的recvData函数:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 单次循环
void loopOnce()
{
    // ...
    for (int i = 0; i < num; i++)
    {
        // ...
        if ((revents & EPOLLIN) && checkFdIsInConnections(fd))
        {
            // 执行读方法
            connections_[fd]->recvData();
        }
        // ...
    }
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 接收信息
void recvData() override
{
    // 循环接收
    while (true)
    {
        int rerrno = 0;
        SockAddrIn client;
        int ac_socketfd = bs_->toAccept(std::addressof(client), &rerrno);
        // 当做IO行为处理
        if (ac_socketfd > 0)
        {
            // LOG(LogLevel::DEBUG) << "准备IO套接字";
            // 获取EpollServer类对象
            std::shared_ptr<epollServerModule::EpollServer> ep = getEpollServer().lock();
            // 正常情况,建立链接
            // 1. 创建连接对象
            std::shared_ptr<Connection> ac_con = std::make_shared<IOService>(ep);
            // IOService *ac_con = new IOService(ep);
            // 2. 设置非阻塞
            EpollServerUtils::setNonBlock(ac_socketfd);
            ac_con->setFd(ac_socketfd);
            ac_con->setEvent(EPOLLIN | EPOLLET);
            ac_con->setClient(std::move(client));
            // LOG(LogLevel::DEBUG) << "开始添加Connection链接到Epoll模型中";
            // 3. 通过Connection将链接指针插入到Epoll模型中
            ep->insertFdAndEvents(ac_con);
            // LOG(LogLevel::DEBUG) << "IO套接字准备完成";
            // 4. 设置IOService类中的方法
            std::shared_ptr<IOService> ptr = std::dynamic_pointer_cast<IOService>(ac_con);
            // LOG(LogLevel::DEBUG) << "Listener模块中IOService对象的引用计数值:" << ac_con.use_count();
            // ptr->setHandleDataFunc(taskModule::Task::task_1);
            ptr->setHandleDataFunc(taskModule::Task::task_1);
        }
        // ...
    }
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void recvData()
{
    while (true)
    {
        char buffer[1024] = {0};
        ssize_t num = recv(getFd(), buffer, sizeof(buffer) - 1, 0);

        if(num > 0)
        {
            // 正常读取到数据
            setInBuffer(buffer);
        }
        else if(num == 0)
        {
            // 读取到结尾,对端关闭
            handleException();
            break;
        }
        else 
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK)
            {
                LOG(LogLevel::INFO) << "数据未准备完成,结束读取";
                break;
            }
            else if(errno == EINTR)
            {
                LOG(LogLevel::INFO) << "读取被信号中断,继续读取";
                continue;
            }
            else
            {
                // 读取出错
                handleException();
                return ;
            }
        }
    }

    // 处理数据
    std::string ret;
    if (handleData_)
        ret = handleData_(getInBuffer());

    // 添加结果到结果字符串中
    setOutBuffer(ret);

    if(!getOutBuffer().empty())
    {
        LOG(LogLevel::DEBUG) << "recvData开启写事件关心";
        std::shared_ptr<epollServerModule::EpollServer> e_svr = getEpollServer().lock();
        e_svr->EnbaleToWriteAndToRead(getFd(), true, true);
    }
}

结合触发问题的原因:客户端断开连接,可以定位到IOServicerecvData函数的handleException,该函数会对相关的资源进行释放,其中包括从哈希表中移除指定的文件描述符,但是,移除完毕后,recvData函数还会继续向下执行,此时handleData_nullptr,但是此时的this所代表的智能指针已经因为引用计数变为0而被释放,所以调用setOutBuffer函数时就会出现野指针问题

解决这个问题的方式就是确保智能指针在执行完recvData之前还有效,这就需要回到loopOnce函数中,当前是直接通过key从哈希表中取出智能指针的方式访问recvData,而其中的handleException函数一旦从哈希表中移除指定的键值对,那么对应的智能指针就被释放,所以为了防止智能指针被提前释放,可以考虑暂时提升指定智能指针的引用计数,再执行完recvData后再自动销毁:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 // 单次循环
void loopOnce()
{
    // ...
    for (int i = 0; i < num; i++)
    {
        // ...
        if ((revents & EPOLLIN) && checkFdIsInConnections(fd))
        {
            // 执行读方法
            // 提升引用计数,确保recvData结束再销毁智能指针
            auto ptr = connections_[fd];
            ptr->recvData();
        }
        // ...
    }
}

同样,对于sendData函数也可能存在对应的问题,所以也需要进行类似的处理:

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 // 单次循环
void loopOnce()
{
    // ..
    for (int i = 0; i < num; i++)
    {
        // ...
        if ((revents & EPOLLOUT) && checkFdIsInConnections(fd))
        {
            // 执行写方法
            // 提升引用计数,确保sendData结束再销毁智能指针
            auto ptr = connections_[fd];
            ptr->sendData();
            // connections_[fd]->sendData();
        }
    }
}

再次测试运行即可发现没有出现野指针问题,并且客户端也可以正常断开连接