code_tin

从头编写高性能服务程序12-区分读写事件
前一个版本很重要的BUG 就是sendfile在发送大文件的时候会发送不完整 这个bug指出了另一需求 就是需要用...
扫描右侧二维码阅读全文
02
2010/02

从头编写高性能服务程序12-区分读写事件

前一个版本很重要的BUG
就是sendfile在发送大文件的时候会发送不完整
这个bug指出了另一需求
就是需要用到EPOLLOUT事件

前面版本我们事件处理都是在EPOLLIN中进行
当有accept_fd数据进来之后
我们判断指令的内容
再直接进行数据的处理

我们更换一种方式
当accept_fd获取到数据之后
解析数据.
当需要输出的时候,将accept_fd的事件变为EPOLLOUT

这样一种fd会有两种状态
接受指令状态以及输出数据状态
慢慢的.我们会发现这个程序越来越像Nginx或者Lighttpd了
因为一个连接会有不同的状态
事件+状态机就是Nginx以及Lighttpd高效的原因
将一个链接分成不同的生命周期然后处理

经过进化
我们的event_handle结构拥有了读写两种hook钩子
typedef struct event_handle{ ... int ( * read_handle )( struct event_handle * ev ); int ( * write_handle )( struct event_handle * ev ); ... }
而我们在针对的处理过程中
初始化时会针对不同的事件挂上不同的钩子函数
int init_evhandle(...){ ... ev->read_handle = r_handle; ev->write_handle = w_handle; ... }
接着在事件发生时调用不同的钩子函数
else if( events[i].events&EPOLLIN ){ EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->read_handle; ... } else if( events[i].events&EPOLLOUT ){ EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle; ... }

当分析完指令的时候
将fd变为EPOLLOUT
int parse_request( ... ev_temp.data.ptr = ev; ev_temp.events = EPOLLOUT|EPOLLET; epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp ); ... }

加入这些处理之后
代码越来越长了
我们还需要加很多东西
比如进程管理,等等
这之后会发现一个mini的nginx骨架或者lighttpd骨架出现了

#include #include #include #include #include #include #include #include #include #include #include #include #include #include #define HANDLE_INFO 1 #define HANDLE_SEND 2 #define HANDLE_DEL 3 #define HANDLE_CLOSE 4 #define MAX_REQLEN 1024 #define MAX_PROCESS_CONN 3 #define FIN_CHAR 0x00 #define SUCCESS 0 #define ERROR -1 typedef struct event_handle{ int socket_fd; int file_fd; int file_pos; int epoll_fd; char request[MAX_REQLEN]; int request_len; int ( * read_handle )( struct event_handle * ev ); int ( * write_handle )( struct event_handle * ev ); int handle_method; } EV,* EH; typedef int ( * EVENT_HANDLE )( struct event_handle * ev ); int create_listen_fd( int port ){ int listen_fd; struct sockaddr_in my_addr; if( ( listen_fd = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ){ perror( "create socket error" ); exit( 1 ); } int flag; int olen = sizeof(int); if( setsockopt( listen_fd, SOL_SOCKET, SO_REUSEADDR , (const void *)&flag, olen ) == -1 ){ perror( "setsockopt error" ); } flag = 5; if( setsockopt( listen_fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &flag, olen ) == -1 ){ perror( "setsockopt error" ); } flag = 1; if( setsockopt( listen_fd, IPPROTO_TCP, TCP_CORK, &flag, olen ) == -1 ){ perror( "setsockopt error" ); } int flags = fcntl( listen_fd, F_GETFL, 0 ); fcntl( listen_fd, F_SETFL, flags|O_NONBLOCK ); my_addr.sin_family = AF_INET; my_addr.sin_port = htons( port ); my_addr.sin_addr.s_addr = INADDR_ANY; bzero( &( my_addr.sin_zero ), 8 ); if( bind( listen_fd, ( struct sockaddr * )&my_addr, sizeof( struct sockaddr_in ) ) == -1 ) { perror( "bind error" ); exit( 1 ); } if( listen( listen_fd, 1 ) == -1 ){ perror( "listen error" ); exit( 1 ); } return listen_fd; } int create_accept_fd( int listen_fd ){ int addr_len = sizeof( struct sockaddr_in ); struct sockaddr_in remote_addr; int accept_fd = accept( listen_fd, ( struct sockaddr * )&remote_addr, &addr_len ); int flags = fcntl( accept_fd, F_GETFL, 0 ); fcntl( accept_fd, F_SETFL, flags|O_NONBLOCK ); return accept_fd; } int fork_process( int process_num ){ int i; int pid=-1; for( i = 0; i < process_num; i++ ){ if( pid != 0 ){ pid = fork(); } } return pid; } int init_evhandle(EH ev,int socket_fd,int epoll_fd,EVENT_HANDLE r_handle,EVENT_HANDLE w_handle){ ev->epoll_fd = epoll_fd; ev->socket_fd = socket_fd; ev->read_handle = r_handle; ev->write_handle = w_handle; ev->file_pos = 0; ev->request_len = 0; ev->handle_method = 0; memset( ev->request, 0, 1024 ); } //accept->accept_queue->request->request_queue->output->output_queue //multi process sendfile int parse_request(EH ev){ ev->request_len--; *( ev->request + ev->request_len - 1 ) = 0x00; int i; for( i=0; irequest_len; i++ ){ if( ev->request[i] == ':' ){ ev->request_len = ev->request_len-i-1; char temp[MAX_REQLEN]; memcpy( temp, ev->request, i ); ev->handle_method = atoi( temp ); memcpy( temp, ev->request+i+1, ev->request_len ); memcpy( ev->request, temp, ev->request_len ); break; } } //handle_request( ev ); //register to epoll EPOLLOUT struct epoll_event ev_temp; ev_temp.data.ptr = ev; ev_temp.events = EPOLLOUT|EPOLLET; epoll_ctl( ev->epoll_fd, EPOLL_CTL_MOD, ev->socket_fd, &ev_temp ); return SUCCESS; } int handle_request(EH ev){ struct stat file_info; switch( ev->handle_method ){ case HANDLE_INFO: ev->file_fd = open( ev->request, O_RDONLY ); if( ev->file_fd == -1 ){ send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 ); return -1; } fstat(ev->file_fd, &file_info); char info[MAX_REQLEN]; sprintf(info,"file len:%d\n",file_info.st_size); send( ev->socket_fd, info, strlen( info ), 0 ); break; case HANDLE_SEND: ev->file_fd = open( ev->request, O_RDONLY ); if( ev->file_fd == -1 ){ send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 ); return -1; } fstat(ev->file_fd, &file_info); sendfile( ev->socket_fd, ev->file_fd, 0, file_info.st_size ); break; case HANDLE_DEL: break; case HANDLE_CLOSE: break; } finish_request( ev ); return SUCCESS; } int finish_request(EH ev){ close(ev->socket_fd); close(ev->file_fd); ev->handle_method = -1; clean_request( ev ); return SUCCESS; } int clean_request(EH ev){ memset( ev->request, 0, MAX_REQLEN ); ev->request_len = 0; } int read_hook_v2( EH ev ){ char in_buf[MAX_REQLEN]; memset( in_buf, 0, MAX_REQLEN ); int recv_num = recv( ev->socket_fd, &in_buf, MAX_REQLEN, 0 ); if( recv_num ==0 ){ close( ev->socket_fd ); return ERROR; } else{ //check ifoverflow if( ev->request_len > MAX_REQLEN-recv_num ){ close( ev->socket_fd ); clean_request( ev ); } memcpy( ev->request + ev->request_len, in_buf, recv_num ); ev->request_len += recv_num; if( recv_num == 2 && ( !memcmp( &in_buf[recv_num-2], "\r\n", 2 ) ) ){ parse_request(ev); } } return recv_num; } int write_hook_v1( EH ev ){ struct stat file_info; ev->file_fd = open( ev->request, O_RDONLY ); if( ev->file_fd == ERROR ){ send( ev->socket_fd, "open file failed\n", strlen("open file failed\n"), 0 ); return ERROR; } fstat(ev->file_fd, &file_info); int write_num; while(1){ write_num = sendfile( ev->socket_fd, ev->file_fd, (off_t *)&ev->file_pos, 10240 ); ev->file_pos += write_num; if( write_num == ERROR ){ if( errno == EAGAIN ){ break; } } else if( write_num == 0 ){ printf( "writed:%d\n", ev->file_pos ); //finish_request( ev ); break; } } return SUCCESS; } int main(){ int listen_fd = create_listen_fd( 3389 ); int pid = fork_process( 3 ); if( pid == 0 ){ int accept_handles = 0; struct epoll_event ev, events[20]; int epfd = epoll_create( 256 ); int ev_s = 0; ev.data.fd = listen_fd; ev.events = EPOLLIN|EPOLLET; epoll_ctl( epfd, EPOLL_CTL_ADD, listen_fd, &ev ); struct event_handle ev_handles[256]; for( ;; ){ ev_s = epoll_wait( epfd, events, 20, 500 ); int i = 0; for( i = 0; iread_handle; EH current_event = ( EH )( events[i].data.ptr ); ( *current_handle )( current_event ); } else if( events[i].events&EPOLLOUT ){ EVENT_HANDLE current_handle = ( ( EH )( events[i].data.ptr ) )->write_handle; EH current_event = ( EH )( events[i].data.ptr ); if( ( *current_handle )( current_event ) == 0 ){ accept_handles--; } } } } } else{ //manager the process int child_process_status; wait( &child_process_status ); } return SUCCESS; }

Last modification:November 26th, 2018 at 04:16 pm
If you think my article is useful to you, please feel free to appreciate

10 comments

  1. nightfighter

    我觉得可以直接用accept_fd做索引的

  2. nightfighter

    ev.data.ptr = &ev_handles[accept_handles];
    用累加的accept_handles做索引 会不会有把还没有EPOLLOUT的数据覆盖的情况?

  3. lxjeanse

    期待后继文章,同时感谢LZ写出这么好的文章

  4. dingshi065
    代码罐头 :

    windycat : 期待看到多线程的解决方案

    多线程在linux下是用进程模拟的.pthread的用法在我前面的文章里有写过,由于pthread的本质,所以我没有想用线程来实现,多进程在linux下已经足够好了.线程在linux下还不是一个很好的选择.

    同意这种看法,我开始接触linux时花了一些精力研究选择多进程还是多线程,事实表明多进程对多数应用、多数人而言是更明智的选择。贝尔实验室自己的产品TUXEDO都是用的多进程。也许多线程真的比多进程廉价一些,但是需要开发人员水平足够,否则使用多线程得不偿失。

  5. 代码罐头
    windycat :期待看到多线程的解决方案

    多线程在linux下是用进程模拟的.pthread的用法在我前面的文章里有写过,由于pthread的本质,所以我没有想用线程来实现,多进程在linux下已经足够好了.线程在linux下还不是一个很好的选择.

  6. windycat

    期待看到多线程的解决方案

  7. dmgt

    期待继续往后写。

  8. zerofantasy

    写的非常好。
    正在做一个基于P2P的分布式计算平台,所以这些实现对我非常有用。
    另外,对于epoll,可以继续写一写epoll事件处理循环中,如何把事件按照链接分队。在程控交换原理中有个概念叫多路复用转换器,即将空分复用转换为时分复用。简单的来说,events[20]当中的事件来自所有的链接,可以设计一个dispatcher,将相同链接的events放到单独的队列中,这样一来就可以使用线程池对每个链接中的事件按照各自的顺序进行处理了。这么做对于实现上层应用很有必要。

  9. 代码罐头

    是可以只有一个钩子函数.
    这样区分只是为了看起来比较清楚一点.呵呵
    非常高兴拙作能对你有用.

  10. dingshi065

    其实可以只有一个钩子函数,写一个event_mod_op()函数修改事件及钩子函数即可。我就是这样做的,我花了一个多月时间,参考nginx,lithttpd,libevent写了一个金融交易服务器,当然也参考了楼主的文章

Leave a Comment