Reactor模式
Reactor模式的定义
Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式。
Reactor模式的角色构成
Reactor主要由以下五个角色构成:
角色 | 解释 |
---|---|
Handle(句柄) | 用于标识不同的事件,本质就是一个文件描述符。 |
Sychronous Event Demultiplexer(同步事件分离器) | 本质就是一个系统调用,用于等待事件的发生。对于Linux来说,同步事件分离器指的就是I/O多路复用,比如select、poll、epoll等。 |
Event Handler(事件处理器) | 由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈。 |
Concrete Event Handler(具体事件处理器) | 事件处理器中各个回调方法的具体实现。 |
Initiation Dispatcher(初始分发器) | 初始分发器实际上就是Reactor角色,初始分发器会通过同步事件分离器来等待事件的发生,当对应事件就绪时就调用事件处理器,最后调用对应的回调方法来处理这个事件。 |
Reactor模式的工作流程
Reactor模式的工作流程如下:
- 当应用向初始分发器注册具体事件处理器时,应用会标识出该事件处理器希望初始分发器在某个事件发生时向其通知,该事件与Handle关联。
- 初始分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
- 当所有的事件处理器注册完毕后,应用会启动初始分发器的事件循环,这时初始分发器会将每个事件处理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。
- 当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器。
- 初始分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
- 初始分发器会调用其对应事件处理器当中对应的回调方法来响应该事件。
epoll ET服务器(Reactor模式)
如果在此之前没有了解过Reactor模式,相信在看了Reactor模式的工作流程后一定是一头雾水,下面我们实现一个Reactor模式下的epoll ET服务器,来感受一下Reactor模式。
设计思路
epoll ET服务器
在epoll ET服务器中,我们需要处理如下几种事件:
- 读事件:如果是监听套接字的读事件就绪则调用accept函数获取底层的连接,如果是其他套接字的读事件就绪则调用recv函数读取客户端发来的数据。
- 写事件:写事件就绪则将待发送的数据写入到发送缓冲区当中。
- 异常事件:当某个套接字的异常事件就绪时我们不做过多处理,直接关闭该套接字。
当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理。
Reactor模式的五个角色
在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:
- 句柄:文件描述符。
- 同步事件分离器:I/O多路复用epoll。
- 事件处理器:包括读回调、写回调和异常回调。
- 具体事件处理器:读回调、写回调和异常回调的具体实现。
- 初始分发器:Reactor类当中的Dispatcher函数。
Dispatcher函数要做的就是调用epoll_wait函数等待事件发生,当有事件发生后就将就绪的事件派发给对应的服务处理程序即可。
EventItem类
- 在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调与某个文件描述符关联起来。
- 这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。
所以我们可以设计一个EventItem类,该类当中的成员就包括一个文件描述符,以及该文件描述符对应的各种回调函数,此外还有一些其他成员,后面实现的时候再做详细论述。
Reactor类
- 在Reactor的工作流程中说到,当所有事件处理器注册完毕后,会使用同步事件分离器等待这些事件发生,当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器,然后初始分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器中对应的回调方法来响应该事件。
- 本质就是当事件注册完毕后,会调用epoll_wait函数来等待这些事件发生,当某个事件就绪时epoll_wait函数会告知调用方,然后调用方就根据就绪的文件描述符来找到其对应的各种回调函数,并调用对应的回调函数进行事件处理。
对此我们可以设计一个Reactor类。
- 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
- 当事件就绪后需要根据就绪的文件描述符来找到其对应的各种回调函数,由于我们会将每个文件描述符及其对应的各种回调都封装到一个EventItem结构当中,所以实际我们就是需要根据文件描述符找到其对应的EventItem结构。
- 我们可以使用C++ STL当中的unordered_map,来建立各个文件描述符与其对应的EventItem结构之间的映射,这个unordered_map可以作为Reactor类的一个成员变量,当需要找某个文件描述符的EventItem结构时就可以通过该成员变量找到。
- 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。
此外,在Reactor类当中还有一些其他成员,后面实现的时候再做详细论述。
epoll ET服务器的工作流程
这个epoll ET服务器在Reactor模式下的工作流程如下:
- 首先epoll ET服务器需要进行套接字的创建、绑定和监听。
- 然后定义一个Reactor对象并初始化,初始化时要做的就是创建epoll模型。
- 紧接着需要为监听套接字创建对应的EventItem结构,并调用Reactor类中提供的AddEvent函数将监听套接字添加到epoll模型中,并建立监听套接字与其对应的EventItem结构之间的映射关系。
- 之后就可以不断调用Reactor类中的Dispatcher函数进行事件派发。
在事件处理过程中,会不断向Dispatcher当中新增或删除事件,而每个事件就绪时都会自动调用其对应的回调函数进行处理,所以我们要做的就是不断调用Dispatcher函数进行事件派发即可。
EventItem结构
EventItem结构中除了包含文件描述符和其对应的读回调、写回调和异常回调之外,还包含一个输入缓冲区inbuffer、一个输出缓冲区outbuffer以及一个回指指针R。
- 当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发来的数据,但我们并不能保证我们读取到了一个完整的报文,因此需要将读取到的数据暂时存放到该文件描述符对应的inbuffer当中,当inbuffer当中可以分离出一个完整的报文后再将其分离出来进行数据处理,这里的inbuffer本质就是用来解决粘包问题的。
- 当处理完一个报文请求后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的数据暂时存放到该文件描述符对应的outbuffer当中,当底层TCP的发送缓冲区中有空间,即写事件就绪时,再依次发送outbuffer当中的数据。
- EventItem结构当中设置回指指针R,便于快速找到我们定义的Reactor对象,因为后续我们需要根据EventItem结构找到这个Reactor对象。比如当连接事件就绪时,需要调用Reactor类当中的AddEvent函数将其添加到Dispatcher当中。
此外,EventItem结构当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。
代码如下:
typedef int(*callback_t)(EventItem*);
class EventItem{
public:
int _sock; //文件描述符
Reactor* _R; //回指指针
callback_t _recv_handler; //读回调
callback_t _send_handler; //写回调
callback_t _error_handler; //异常回调
std::string _inbuffer; //输入缓冲区
std::string _outbuffer; //输出缓冲区
public:
EventItem()
: _sock(-1)
, _R(nullptr)
, _recv_handler(nullptr)
, _send_handler(nullptr)
, _error_handler(nullptr)
{}
//管理回调
void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler)
{
_recv_handler = recv_handler;
_send_handler = send_handler;
_error_handler = error_handler;
}
~EventItem()
{}
};
Reactor类
在Reactor类当中有一个unordered_map成员,用于建立文件描述符和与其对应的EventItem结构之间的映射,还有一个epfd成员,该成员是epoll模型对应的文件描述符。
- 在初始化Reactor对象的时候就可以调用epoll_create函数创建epoll模型,并将该epoll模型对应的文件描述符用epfd成员记录下来,便于后续使用。
- Reactor对象在析构的时候,需要调用close函数将该epoll模型进行关闭。
代码如下:
#define SIZE 256
class Reactor{
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
Reactor()
: _epfd(-1)
{}
void InitReactor()
{
//创建epoll模型
_epfd = epoll_create(SIZE);
if (_epfd < 0){
std::cerr << "epoll_create error" << std::endl;
exit(5);
}
}
~Reactor()
{
if (_epfd >= 0){
close(_epfd);
}
}
};
Dispatcher函数(事件分派器)
Reactor类当中的Dispatcher函数就是之前所说的初始分发器,这里我们更形象的将其称之为事件分派器。
- 事件分派器要做的就是调用epoll_wait函数等待事件发生。
- 当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的EventItem结构,然后调用EventItem结构当中对应的回调函数对该事件进行处理即可。
代码如下:
#define MAX_NUM 64
class Reactor{
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
//事件分派器
void Dispatcher(int timeout)
{
struct epoll_event revs[MAX_NUM];
int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);
for (int i = 0; i < num; i++){
int sock = revs[i].data.fd; //就绪的文件描述符
if ((revs[i].events&EPOLLERR) || (revs[i].events&EPOLLHUP)){ //异常事件就绪(优先处理)
if (_event_items[sock]._error_handler)
_event_items[sock]._error_handler(&_event_items[sock]); //调用异常回调
}
if (revs[i].events&EPOLLIN){ //读事件就绪
if (_event_items[sock]._recv_handler)
_event_items[sock]._recv_handler(&_event_items[sock]); //调用读回调
}
if (revs[i].events&EPOLLOUT){ //写事件就绪
if (_event_items[sock]._send_handler)
_event_items[sock]._send_handler(&_event_items[sock]); //调用写回调
}
}
}
};
说明一下:
- 这里没有用switch或if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
- 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理。
- 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理。
- 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理。
- 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。
AddEvent函数
Reactor类当中的AddEvent函数是用于进行事件注册的。
- 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
- 还需要传入该文件描述符对应的EventItem结构,表示当该文件描述符上的事件就绪后应该执行的回调方法。
- 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。
代码如下:
class Reactor{
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
void AddEvent(int sock, uint32_t event, const EventItem& item)
{
struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0){ //将该文件描述符添加到epoll模型当中
std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;
}
else{
//建立sock与EventItem结构的映射关系
_event_items.insert({ sock, item });
std::cout << "添加: " << sock << " 到epoll模型中,成功" << std::endl;
}
}
};
DelEvent函数
Reactor类当中的DelEvent函数是用于进行事件删除的。
- 在删除事件时只需要传入一个文件描述符即可。
- 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。
代码如下:
class Reactor{
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
void DelEvent(int sock)
{
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0){ //将该文件描述符从epoll模型中删除
std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;
}
else{
//取消sock与EventItem结构的映射关系
_event_items.erase(sock);
std::cout << "从epoll模型中删除: " << sock << ",成功" << std::endl;
}
}
};
EnableReadWrite函数
Reactor类当中的EnableReadWrite函数,用于使能或使能某个文件描述符的读写事件。
- 调用EnableReadWrite函数时需要传入一个文件描述符,表示需要设置的是哪个文件描述符对应的事件。
- 还需要传入两个bool值,分别表示需要使能还是使能读写事件。
- EnableReadWrite函数内部会调用epoll_ctl函数修改将该文件描述符的监听事件。
代码如下:
class Reactor{
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
void EnableReadWrite(int sock, bool read, bool write){
struct epoll_event ev;
ev.data.fd = sock;
ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0){ //修改该文件描述符所需要监视的事件
std::cerr << "epoll_ctl mod error, fd: " << sock << std::endl;
}
}
};
回调函数
下面我们就可以实现一些回调函数,这里主要实现四个回调函数。
- accepter:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
- recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
- sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
- errorer:当异常事件就绪时可以调用该函数将对应的文件描述符进行关闭。
当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数到EventItem结构当中。
- 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为accepter,因为监听套接字的读事件就绪就意味着连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
- 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用accepter回调获取底层建立好的连接。
- 而对于与客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
- 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。
accepter回调
accepter回调用于处理连接事件,其工作流程如下:
- 调用accept函数获取底层建立好的连接。
- 将获取到的套接字设置为非阻塞,并为其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
- 将该套接字及其对应需要关心的事件注册到Dispatcher当中。
下一次Dispatcher在进行事件派发时就会帮我们关注该套接字对应的事件,当事件就绪时就会执行该套接字对应的EventItem结构中对应的回调方法。
代码如下:
int accepter(EventItem* item)
{
while (true){
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);
if (sock < 0){
if (errno == EAGAIN || errno == EWOULDBLOCK){ //并没有读取出错,只是底层没有连接了
return 0;
}
else if (errno == EINTR){ //读取的过程被信号中断了
continue;
}
else{ //获取连接失败
std::cerr << "accept error" << std::endl;
return -1;
}
}
SetNonBlock(sock); //将该套接字设置为非阻塞
//构建EventItem结构
EventItem sock_item;
sock_item._sock = sock;
sock_item._R = item->_R;
sock_item.ManageCallbacks(recver, sender, errorer); //注册回调方法
Reactor* R = item->_R;
R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item); //将该套接字及其对应的事件注册到Dispatcher中
}
return 0;
}
需要注意的是,因为这里实现的ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。
- 因为ET模式下只有当底层建立的连接从无到有或是从有到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有读取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
- 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住,因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。
accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。
设置文件描述符为非阻塞
设置文件描述符为非阻塞时,需要先调用fcntl函数获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK
,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。
代码如下:
//设置文件描述符为非阻塞
bool SetNonBlock(int sock)
{
int fl = fcntl(sock, F_GETFL);
if (fl < 0){
std::cerr << "fcntl error" << std::endl;
return false;
}
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
return true;
}
监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。
- 如果错误码为
EAGAIN
或EWOULDBLOCK
,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次accepter调用成功。 - 如果错误码为
EINTR
,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。 - 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次accepter调用失败。
accept、recv和send等IO系统调用为什么会被信号中断?
IO系统调用函数出错返回并且将错误码设置为EINTR
,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但并没有返回用户态的时候内核跑去处理其他信号了。
- 在内核态返回用户态之前会检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,那么内核就会对该信号进行处理。
- 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上是一个特例,因为IO过程分为“等”和“拷贝”两个步骤,而一般“等”的过程比较漫长,而在这个过程中我们的执行流其实是处于闲置的状态的,因此在“等”的过程中如果有信号产生,内核就会立即进行信号的处理。
写事件是按需打开的
这里调用accept获取上来的套接字在添加到Dispatcher中时,只添加了EOPLLIN
和EPOLLET
事件,也就是说只让epoll帮我们关心该套接字的读事件。
- 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此没有必要让epoll帮我们关心写事件。
- 一般读事件是经常会被设置的,而写事件则是按序打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。
recver回调
recver回调用于处理读事件,其工作流程如下:
- 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应EventItem结构的inbuffer当中。
- 对inbuffer当中的数据进行切割,将完整的报文切割出来,剩余的留在inbuffer当中。
- 对切割出来的完整报文进行反序列化。
- 业务处理。
- 业务处理后形成响应报文。
- 将响应报头添加到对应EventItem结构的outbuffer当中,并打开写事件。
下一次Dispatcher在进行事件派发时就会帮我们关注该套接字的写事件,当写事件就绪时就会执行该套接字对应的EventItem结构中写回调方法,进而将outbuffer中的响应数据发送给客户端。
代码如下:
int recver(EventItem* item)
{
if (item->_sock < 0) //该文件描述符已经被关闭
return -1;
//1、数据读取
if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败
item->_error_handler(item);
return -1;
}
//2、报文切割
std::vector<std::string> datagrams;
StringUtil::Split(item->_inbuffer, &datagrams, "X");
for (auto s : datagrams){
//3、反序列化
struct data d;
StringUtil::Deserialize(s, &d._x, &d._y, &d._op);
//4、业务处理
int result = 0;
switch (d._op)
{
case '+':
result = d._x + d._y;
break;
case '-':
result = d._x - d._y;
break;
case '*':
result = d._x * d._y;
break;
case '/':
if (d._y == 0){
std::cerr << "Error: div zero!" << std::endl;
continue; //继续处理下一个报文
}
else{
result = d._x / d._y;
}
break;
case '%':
if (d._y == 0){
std::cerr << "Error: mod zero!" << std::endl;
continue; //继续处理下一个报文
}
else{
result = d._x % d._y;
}
break;
default:
std::cerr << "operation error!" << std::endl;
continue; //继续处理下一个报文
}
//5、形成响应报文
std::string response;
response += std::to_string(d._x);
response += d._op;
response += std::to_string(d._y);
response += "=";
response += std::to_string(result);
response += "X"; //报文与报文之间的分隔符
//6、将响应报文添加到outbuffer中
item->_outbuffer += response;
if (!item->_outbuffer.empty())
item->_R->EnableReadWrite(item->_sock, true, true); //打开写事件
}
return 0;
}
一、数据读取
我们可以将循环调用recv函数读取数据的过程封装成一个recver_helper函数。
- recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer当中。
- 当recv函数的返回值小于0时同样需要进一步判断错误码,如果错误码为
EAGAIN
或EWOULDBLOCK
则说明底层数据读取完毕了,如果错误码为EINTR
则说明读取过程被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。 - 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
代码如下:
int recver_helper(int sock, std::string* out)
{
while (true){
char buffer[128];
ssize_t size = recv(sock, buffer, sizeof(buffer)-1, 0);
if (size < 0){
if (errno == EAGAIN || errno == EWOULDBLOCK){ //数据读取完毕
return 0;
}
else if (errno == EINTR){ //被信号中断,继续尝试读取
continue;
}
else{ //读取出错
return -1;
}
}
else if (size == 0){ //对端连接关闭
return -1;
}
//读取成功
buffer[size] = '\0';
*out += buffer; //将读取到的数据添加到该套接字对应EventItem结构的inbuffer中
}
}
二、报文切割
报文切割本质就是为了防止粘包问题,而粘包问题实际是涉及到协议定制的。
- 因为我们需要根据协议知道如何将各个报文进行分离,比如UDP分离报文采用的就是定长报头+自描述字段。
- 我们的目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,这里我们就以“X”作为各个报文之间的分隔符,每个报文的最后都会以一个“X”作为报文结束的标志。
- 因此现在要做的就是以“X”作为分隔符对inbuffer当中的字符串进行切割,这里将这个过程封装成一个Split函数并放到一个StringUtil工具类当中。
- Split函数要做的就是对inbuffer当中的字符串进行切割,将切割出来的一个个报文放到vector当中,对于最后无法切出完整报文的数据就留在inbuffer当中即可。
代码如下:
class StringUtil{
public:
static void Split(std::string& in, std::vector<std::string>* out, std::string sep)
{
int start = 0;
size_t pos = in.find(sep, start);
while (pos != std::string::npos){
out->push_back(in.substr(start, pos - start));
start = pos + sep.size();
pos = in.find(sep, start);
}
in = in.substr(start);
}
};
三、反序列化
在数据发送之前需要进行序列化encode,接收到数据之后需要对数据进行反序列化decode。
- 序列化就是将对象的状态信息转换为可以存储或传输的形式(字节序列)的过程。
- 反序列化就是把字节序列恢复为原对象的过程。
实际反序列化也是与协议定制相关的,假设这里的epoll服务器向客户端提供的就是计算服务,客户端向服务器发来的都是需要服务器计算的计算表达式,因此可以用一个结构体来描述这样一个计算表达式,结构体当中包含两个操作数x和y,以及一个操作符op。
struct data{
int _x;
int _y;
char _op;
};
此时这里所谓的反序列化就是将一个计算表达式转换成这样一个结构体,
- 因此现在要做的就是将形如“1+2”这样的计算表达式转换成一个结构体,该结构体当中的x成员的值就是1,y的值就是2,op的值就是‘+’,这里将这个过程封装成一个Deserialize函数并放到StringUtil工具类当中。
- Deserialize函数要做的工作其实也很简单,就是在传入的字符串当中找到操作符op,此时操作符左边的就是操作数x,右边的就是操作数y。
代码如下:
class StringUtil{
public:
static void Deserialize(std::string& in, int* x, int* y, char* op)
{
size_t pos = 0;
for (pos = 0; pos < in.size(); pos++){
if (in[pos] == '+' || in[pos] == '-' || in[pos] == '*' || in[pos] == '/' || in[pos] == '%')
break;
}
if (pos < in.size()){
std::string left = in.substr(0, pos);
std::string right = in.substr(pos + 1);
*x = atoi(left.c_str());
*y = atoi(right.c_str());
*op = in[pos];
}
else{
*op = -1;
}
}
};
说明一下: 实际在做项目时不需要我们自己进行序列化和反序列化,我们一般会直接用JSON或XML这样的序列化反序列化工具。
四、业务处理
业务处理就是服务器拿到客户端发来的数据后,对数据进行数据分析,最终拿到客户端想要的资源。
- 我们这里要做的业务处理非常简单,就是用反序列化后的数据进行数据计算,此时得到的计算结果就是客户端想要的。
五、形成响应报文
在业务处理后我们已经拿到了客户端想要的数据,现在我们要做的就是形成响应报文,由于我们这里规定每个报文都以“X”作为报文结束的标志,因此在形成响应报文的时候,就需要在每一个计算结果后面都添加上一个“X”,表示这是之前某一个请求报文的响应报文,因为协议制定后就需要双方遵守。
六、将响应报文添加到outbuffer中
响应报文构建完后需要将其添加到该套接字对应的outbuffer中,并打开该套接字的写事件,此后当写事件就绪时就会将outbuffer当中的数据发送出去。
sender回调
sender回调用于处理写事件,其工作流程如下:
- 循环调用send函数发送数据,并将发送出去的数据从该套接字对应EventItem结构的outbuffer中删除。
- 如果循环调用send函数后该套接字对应的outbuffer当中的数据被全部发送,此时就需要将该套接字对应的写事件关闭,因为已经没有要发送的数据了,如果outbuffer当中的数据还有剩余,那么该套接字对应的写事件就应该继续打开。
代码如下:
int sender(EventItem* item)
{
if (item->_sock < 0) //该文件描述符已经被关闭
return -1;
int ret = sender_helper(item->_sock, item->_outbuffer);
if (ret == 0){ //全部发送成功,不再关心写事件
item->_R->EnableReadWrite(item->_sock, true, false);
}
else if (ret == 1){ //没有发送完毕,还需要继续关心写事件
item->_R->EnableReadWrite(item->_sock, true, true);
}
else{ //写入出错
item->_error_handler(item);
}
return 0;
}
我们可以将循环调用send函数发送数据的过程封装成一个sender_helper函数。
- sender_helper函数要做的就是循环调用send函数将outbuffer中的数据发送出去。
- 当send函数的返回值小于0时也需要进一步判断错误码,如果错误码为
EAGAIN
或EWOULDBLOCK
则说明底层TCP发送缓冲区已经被写满了,这时需要将已经发送的数据从outbuffer中移除。 - 如果错误码为
EINTR
则说明发送过程被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了。 - 当发送出错时也直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。
- 如果最终outbuffer当中的数据全部发送成功,则将outbuffer清空即可。
代码如下:
int sender_helper(int sock, std::string& in)
{
size_t total = 0; //累加已经发送的字节数
while (true){
ssize_t size = send(sock, in.c_str() + total, in.size() - total, 0);
if (size < 0){
if (errno == EAGAIN || errno == EWOULDBLOCK){ //底层发送缓冲区已经没有空间了
in.erase(0, total); //将已经发送的数据移出outbuffer
return 1; //缓冲区写满,没写完
}
else if (errno == EINTR){ //被信号中断,继续尝试写入
continue;
}
else{ //写入出错
return -1;
}
}
total += size;
if (total >= in.size()){
in.clear(); //清空outbuffer
return 0; //全部写入完毕
}
}
}
errorer回调
errorer回调用于处理异常事件。
- 对于异常事件就绪的套接字我们这里不做其他过多的处理,简单的调用close函数将该套接字关闭即可。
- 但是在关闭该套接字之前,需要先调用DelEvent函数将该套接字从epoll模型中删除,并取消该套接字与其对应的EventItem结构的映射关系。
- 由于在Dispatcher当中是先处理的异常事件,为了避免该套接字被关闭后继续进行读写操作,然后因为读写操作失败再次调用errorer回调重复关闭该文件描述符,因此在关闭该套接字后将其EventItem当中的文件描述符值设置为-1。
- 在调用recver和sender回调执行读写操作之前,都会判断该EventItem结构当中的文件描述符值是否有效,如果无效则不会进行后续操作。
代码如下:
int errorer(EventItem* item)
{
item->_R->DelEvent(item->_sock); //将该文件描述符从epoll模型中删除,并取消该文件描述符与其EventItem结构的映射关系
close(item->_sock); //关闭该文件描述符
item->_sock = -1; //防止关闭后继续执行读写回调
return 0;
}
套接字相关
这里可以编写一个Socket类,对套接字相关的接口进行一定程度的封装,为了让外部能够直接调用Socket类当中封装的函数,于是将这些函数定义成了静态成员函数。
代码如下:
class Socket{
public:
//创建套接字
static int SocketCreate()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0){
std::cerr << "socket error" << std::endl;
exit(2);
}
//设置端口复用
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
return sock;
}
//绑定
static void SocketBind(int sock, int port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
socklen_t len = sizeof(local);
if (bind(sock, (struct sockaddr*)&local, len) < 0){
std::cerr << "bind error" << std::endl;
exit(3);
}
}
//监听
static void SocketListen(int sock, int backlog)
{
if (listen(sock, backlog) < 0){
std::cerr << "listen error" << std::endl;
exit(4);
}
}
};
运行epoll ET服务器
运行我们的epoll ET服务器的步骤如下:
- 首先需要进行的就是套接字的创建、绑定和监听,因为是ET模式下的epoll服务器,因此监听套接字创建出来后需要将其设置为非阻塞。
- 然后就可以实例化一个Reactor对象,并对其进行初始化,也就是创建epoll模型。
- 紧接着需要为监听套接字定义一个EventItem结构,填充EventItem结构当中的各个字段,并将accepter回调设置为监听套接字的读回调方法。
- 然后调用AddEvent函数将监听套接字及其需要关系的事件添加到Dispatcher当中,该过程包括将监听套接字注册到epoll模型中,以及建立监听套接字与其对应EventItem结构的映射。
- 最后就可以循环调用Reactor类当中的Dispatcher函数进行事件派发了。
代码如下:
#include "app_interface.hpp"
#include "reactor.hpp"
#include "socket.hpp"
#include "util.hpp"
#include <string>
#define BACK_LOG 5
static void Usage(std::string proc)
{
std::cout << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char* argv[])
{
if (argc != 2){
Usage(argv[0]);
exit(1);
}
int port = atoi(argv[1]);
//服务器监听套接字的创建、绑定和监听
int listen_sock = Socket::SocketCreate();
SetNonBlock(listen_sock); //将监听套接字设置为非阻塞
Socket::SocketBind(listen_sock, port);
Socket::SocketListen(listen_sock, BACK_LOG);
//创建Reactor,并初始化
Reactor R;
R.InitReactor();
//创建监听套接字对应的EventItem结构
EventItem item;
item._sock = listen_sock;
item._R = &R;
item.ManageCallbacks(accepter, nullptr, nullptr); //监听套接字只需要关心读事件
//将监听套接字托管给Dispatcher
R.AddEvent(listen_sock, EPOLLIN | EPOLLET, item);
//循环进行事件派发
int timeout = 1000;
while (true){
R.Dispatcher(timeout);
}
return 0;
}
运行服务器后可以看到3号文件描述符被添加到了epoll模型中,这里的3号文件描述符其实就是监听套接字。
当客户端连接服务器后,在服务器端会显示5号文件描述符被添加到了epoll模型当中,因为4号文件描述符已经被epoll模型使用了。
此时客户端就可以向服务器发送一些简单计算任务,这些计算任务之间用“X”隔开,服务器收到计算请求并处理后就会将计算结果发送给客户端,这些计算结果之间也是用“X”隔开的。
此外,由于使用了多路转接技术,虽然当前的epoll服务器是一个单进程的服务器,但它却可以同时为多个客户端提供服务。
当客户端退出后服务器端也会将对应的文件描述符从epoll模型中删除。
接入线程池
单进程epoll服务器存在的问题
因为当前的epoll服务器的业务处理逻辑比较简单,所以单进程的epoll服务器看起来没什么压力,但如果服务器的业务处理逻辑比较复杂,那么某些客户端发来的数据请求就可能长时间得不到响应,因为这时epoll服务器需要花费大量时间进行业务处理,而在这个过程中服务器无法为其他客户端提供服务。
解决思路
可以在当前服务器的基础上接入线程池,当recver回调读取完数据并完成报文的切割和反序列化后,就可以将其构建成一个任务然后放到线程池的任务队列中,然后服务器就可以继续进行事件派发,而不需要将时间耗费到业务处理上面,而放到任务队列当中的任务,则由线程池当中的若干个线程进行处理。
接入线程池
在博主的另一篇博客当中详细介绍并实现了线程池,这里直接将线程池的代码接入到当前的epoll ET服务器,因此下面只会讲解接入线程池的方法,如果对线程池的实现有疑问的可以去阅读那篇博客。
线程池的代码如下:
#pragma once
#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>
#define NUM 5
//线程池
template<class T>
class ThreadPool
{
public:
//提供一个全局访问点
static ThreadPool* GetInstance()
{
return &_sInst;
}
private:
bool IsEmpty()
{
return _task_queue.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void Wait()
{
pthread_cond_wait(&_cond, &_mutex);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
public:
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
//线程池中线程的执行例程
static void* Routine(void* arg)
{
pthread_detach(pthread_self());
ThreadPool* self = (ThreadPool*)arg;
//不断从任务队列获取任务进行处理
while (true){
self->LockQueue();
while (self->IsEmpty()){
self->Wait();
}
T task;
self->Pop(task);
self->UnLockQueue();
task.Run(); //处理任务
}
}
void ThreadPoolInit()
{
pthread_t tid;
for (int i = 0; i < _thread_num; i++){
pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针
}
}
//往任务队列塞任务(主线程调用)
void Push(const T& task)
{
LockQueue();
_task_queue.push(task);
UnLockQueue();
WakeUp();
}
//从任务队列获取任务(线程池中的线程调用)
void Pop(T& task)
{
task = _task_queue.front();
_task_queue.pop();
}
private:
ThreadPool(int num = NUM) //构造函数私有
: _thread_num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool&) = delete; //防拷贝
std::queue<T> _task_queue; //任务队列
int _thread_num; //线程池中线程的数量
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> _sInst;
};
template<class T>
ThreadPool<T> ThreadPool<T>::_sInst;
在服务器开始进行事件派发之前需要对线程池进行初始化:
//初始化线程池
ThreadPool<Task>::GetInstance()->ThreadPoolInit();
设计任务类
线程池有了之后需要定义出一个任务类,该任务类当中需要提供一个Run方法,这个Run方法就是线程池中的若干线程池从任务队列当中拿到任务后会执行的方法。
- 在任务类中包含两个成员变量,成员变量d就是反序列化后用于进行业务处理的数据,成员变量item就是该套接字的EventItem结构,因为数据处理完后需要将形成的响应报文添加到该套接字对应outbuffer当中。
- Run方法中处理数据的逻辑与之前的一样,只是现在将那部分代码放到了Run方法中。
代码如下:
#pragma once
#include <iostream>
#include "reactor.hpp"
#include "comm.hpp"
//任务类
class Task
{
private:
struct data _d;
EventItem* _item;
public:
Task(struct data d, EventItem* item)
: _d(d), _item(item)
{}
Task() //提供默认构造
{}
~Task()
{}
//处理任务的方法
void Run()
{
//4、业务处理
int result = 0;
switch (_d._op)
{
case '+':
result = _d._x + _d._y;
break;
case '-':
result = _d._x - _d._y;
break;
case '*':
result = _d._x * _d._y;
break;
case '/':
if (_d._y == 0){
std::cerr << "Error: div zero!" << std::endl;
return;
}
else{
result = _d._x / _d._y;
}
break;
case '%':
if (_d._y == 0){
std::cerr << "Error: mod zero!" << std::endl;
return;
}
else{
result = _d._x % _d._y;
}
break;
default:
std::cerr << "operation error!" << std::endl;
return;
}
std::cout << "thread[" << pthread_self() << "]:" << _d._x << _d._op << _d._y << "=" << result << std::endl;
//5、形成响应报文
std::string response;
response += std::to_string(_d._x);
response += _d._op;
response += std::to_string(_d._y);
response += "=";
response += std::to_string(result);
response += "X"; //报文与报文之间的分隔符
//6、将响应报文添加到outbuffer中
_item->_outbuffer += response;
if (!_item->_outbuffer.empty())
_item->_R->EnableReadWrite(_item->_sock, true, true); //打开写事件
}
};
此时recver回调函数中在读取数据、报文切割、反序列化后就可以构建出一个任务对象,然后将该任务放到任务队列当中就行了。
int recver(EventItem* item)
{
if (item->_sock < 0) //该文件描述符已经被关闭
return -1;
//1、数据读取
if (recver_helper(item->_sock, &(item->_inbuffer)) < 0){ //读取失败
item->_error_handler(item);
return -1;
}
//2、报文切割
std::vector<std::string> datagrams;
StringUtil::Split(item->_inbuffer, &datagrams, "X");
for (auto s : datagrams){
//3、反序列化
struct data d;
StringUtil::Deserialize(s, &d._x, &d._y, &d._op);
Task t(d, item); //构建任务
ThreadPool<Task>::GetInstance()->Push(t); //将任务push到线程池的任务队列中
}
return 0;
}
至此线程池便接入完毕了,由于我们在任务被处理后将处理对应任务线程的ID进行了打印,因此现在客户端向服务器发送的计算请求在被处理后,可以在服务器端看到处理该任务的线程的ID。