C++后端开发入门-FTP服务器与Libevent初探
环境准备
安装Libevent:
1
| sudo apt install libevent-dev
|
安装后需要在Visual Studio 2022的“跨平台”下的远程标头IntelliSense管理器中将目标主机更新,并在项目属性链接器的“库依赖项”中添加“event”表示链接libevent.so,换作命令行为:
1
| gcc test.c -o test -I /usr/include -levent
|
Libevent入门
用event_init
初始化Libevent:
1 2 3
| #include <event2/event-config.h> #include <event.h> struct event_base *event_init(void);
|
用event_set
初始化event事件,设置回调函数和关注的事件:
1 2 3 4 5 6 7 8 9
| #define evutil_socket_t int void event_set( struct event *ev, evutil_socket_t fd, short event, void (*cb)(evutil_socket_t, short, void *), void *arg ); #define evtimer_set(ev, cb, arg) event_set((ev), -1, 0, (cb), (arg))
|
用event_base_set
设置event要注册到哪个event_base实例上。Libevent不管理event事件集合,要应用程序自行管理。
1 2 3 4
| int event_base_set( struct event_base *, struct event * );
|
用event_add
添加事件。对于定时事件,Libevent用一个小根堆维护,键为超时事件。对于信号和I/O事件,Libevent将其放到等待链表中,后者是个双向链表。
1 2 3 4
| int event_add( struct event *ev, const struct timeval *timeout );
|
用event_dispatch
进入无限循环,等待就绪事件并执行回调函数:
1 2 3
| int event_base_dispatch( struct event_base * );
|
用event_new
创建事件对象:
1 2 3 4 5 6 7
| struct event *event_new( struct event_base *, evutil_socket_t, short, event_callback_fn, void * );
|
用event_config_new
创建一个event_config对象,用event_config_set_flag
设置event_base运行时特性,用event_config_free
释放一个event_config对象:
1 2 3 4 5 6 7 8
| struct event_config *event_config_new(void); int event_config_set_flag( struct event_config *cfg, int flag ); void event_config_free( struct event_config *cfg );
|
用event_base_new_with_config
创建自定义配置event_base。用event_base_dispatch
启动事件循环。用event_base_free
释放event_base:
1 2 3 4 5 6 7 8 9
| struct event_base *event_base_new_with_config( const struct event_config * ); int event_base_dispatch( struct event_base * ); void event_base_free( struct event_base * );
|
用evconnlistener_new_bind
创建监听事件,并监听给定地址上的TCP连接。用evconnlistener_free
释放一个监听器对象。
1 2 3 4 5 6 7 8 9 10 11 12 13
| #include <event2/listener.h> struct evconnlistener *evconnlistener_new_bind( struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, const struct sockaddr *sa, int socklen ); void evconnlistener_free( struct evconnlistener *lev );
|
用bufferevent_socket_new
创建一个基于套接字的bufferevent,将指定文件描述符封装为带有读写缓冲区的高级事件对象:
1 2 3 4 5 6
| #include <event2/bufferevent.h> struct bufferevent *bufferevent_socket_new( struct event_base *base, evutil_socket_t fd, int options );
|
用bufferevent_set_timeouts
设置bufferevent读写超时时间,超时后触发相应超时时间:
1 2 3 4 5
| int bufferevent_set_timeouts( struct bufferevent *bufev, const struct timeval *timeout_read, const struct timeval *timeout_write );
|
用bufferevent_socket_new
建立异步非阻塞TCP连接:
1 2 3 4 5
| int bufferevent_socket_connect( struct bufferevent *, const struct sockaddr *, int );
|
用bufferevent_setcb
设置bufferevent的回调函数:
1 2 3 4 5 6 7
| void bufferevent_setcb( struct bufferevent *bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg );
|
分别用bufferevent_read
和bufferevent_write
从bufferevent输入/输出缓冲区中读取/写入数据:
1 2 3 4 5 6 7 8 9 10
| size_t bufferevent_read( struct bufferevent* bufev, void* data, size_t size ); int bufferevent_write( struct bufferevent* bufev, const void* data, size_t size );
|
用bufferevent_trigger
可手动触发某bufferevent回调函数:
1 2 3 4 5
| void bufferevent_trigger( struct bufferevent* bufev, short iotype, int options );
|
用evutil_inet_pton
将点分十进制字符串转为二进制IP地址,是inet_pton
的封装:
1 2 3 4 5
| int evutil_inet_pton( int af, const char *src, void *dst );
|
用evutil_socketpair
在本地创建一对全双工的用于进程间通信的套接字:
1 2 3 4 5 6
| int evutil_socketpair( int d, int type, int protocol, evutil_socket_t sv[2] );
|
用evutil_make_socket_nonblocking
将一个套接字设为非阻塞模式:
1 2 3
| int evutil_make_socket_nonblocking( evutil_socket_t sock );
|
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #include <sys/types.h> #include <event2/event-config.h> #include <stdio.h> #include <event.h> struct event ev; struct timeval tv; void time_cb(int fd, short event, void* argc) { printf("timer wakeup!\n"); event_add(&ev, &tv); return; }; int main() { struct event_base* base = event_init(); tv.tv_sec = 10; tv.tv_usec = 0; evtimer_set(&ev, time_cb, NULL); event_base_set(base, &ev); event_add(&ev, &tv); event_base_dispatch(base); return; };
|
FTP协议入门
FTP工作方式分为主动方式PORT和被动方式PASV,主动方式可避免服务端防火墙干扰,被动方式可避免客户端防火墙干扰。
主动模式时客户端开启一个大于1024的随机端口,与服务器21号端口建立TCP控制连接。当用户需要传输数据时,客户端在控制通道中用PORT命令向服务器发送本地IP地址和端口号,服务器20号端口主动连接客户端发来的指定端口,在这条数据连接上进行文件下载上传。
被动模式时客户端开启一个大于1024的随机端口,与服务器21号端口建立TCP控制连接。当用户需要传输数据时,客户端向服务器发送PASV命令通知服务器采用被动传输方式。服务器收到PASV命令后开启一个大于1024的随机端口,并将IP地址和该端口号通过控制连接发送给客户端,客户端与服务器该端口建立TCP数据连接,进行文件下载上传。
FTP命令根据功能不同分为访问控制命令、传输参数命令和FTP服务命令,都是以网络虚拟终端NVT的ASCI文本形式发送,以ASCII回车或换行符结束。
常见控制命令如下:
控制命令 |
功能 |
USER username |
登录用户,username是登录用户名 |
CWD pathname |
改变工作路径,pathname是指定目录路径名 |
CDUP |
回到上一层目录 |
常用传输参数命令如下:
传输参数命令 |
功能 |
PORT h1,h2,h3,h4,p1,p2 |
主动传输方式,h1~h4为IP地址,p1*256+p2为端口号 |
PASV |
被动传输方式 |
常用服务命令如下:
服务命令 |
功能 |
LIST pathname |
请求服务器发送列表信息,如指定目录文件列表 |
RETR pathname |
请求服务器向客户端发送指定文件 |
STOR pathname |
客户端向服务器上传指定文件,已存在则替换,不存在则新建 |
PWD |
返回当前工作目录名 |
每条FTP指令至少产生一个FTP回应,一个FTP回应包含一个FTP应答码和一段文本说明,文本说明由服务器随便设定。FTP应答码由3位数字组成,第一位表示相应是成功/失败/不完全的,第二位表示该相应针对哪部分,第三位为附加信息。
常见FTP应答码第一位含义如下:
应答码第一位 |
含义 |
1 |
确定预备应答。操作目前为止正常但尚未完成。 |
2 |
确定完成应答。操作成功完成。 |
4 |
暂时拒绝完成应答。操作执行失败,未接受命令,可稍后继续发送命令。 |
5 |
永久拒绝完成应答。命令不被接受且不再重试。 |
常见FTP应答码第二位含义如下:
应答码第二位 |
含义 |
0 |
格式错误 |
2 |
控制或数据连接 |
3 |
认证和账户登录过程 |
5 |
文件系统状态 |
常用FTP应答码例子如下:
应答码例子 |
含义 |
200 |
指令成功 |
501 |
参数或变量语法错误 |
226 |
关闭数据连接,请求文件操作成功 |
150 |
文件状态良好,打开数据连接 |
FTP服务器原理
本小节用的这个项目https://github.com/BigCJL/libevent-Ftp_Server 。服务端启动后Windows客户端资源管理器中打开报错,不知道为啥,反正命令行ftp工具能连上,一些手写的Windows客户端也能连上,据说Filezilla等也能连上。
主程序Ftp_Server.cpp内容如下,创建监听事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include <iostream> #include "XThreadPool.h" #include "XFtpServerCMD.h" #include <event2/listener.h> #include <event2/event.h> #ifndef _WIN32 #include <signal.h> #include <string.h> #endif #include "XFtpFactory.h" using namespace std; #define SPORT 21 void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg) { cout << "listen_cb" << endl; XTask* task = XFtpFactory::Get()->CreateTask(); task->sock = s; XThreadPool::Get()->Dispatch(task); return; }; int main() { #ifdef _WIN32 WSADATA wsa; WSAStartup(MAKEWORD(2, 2), &wsa); #else if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) return 1; #endif XThreadPool::Get()->Init(10); std::cout << "test_thread_pool_server\n"; event_base* base = event_base_new(); if (base) std::cout << "event_base_new success!" << std::endl; sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(SPORT); evconnlistener* ev = evconnlistener_new_bind(base, listen_cb, base, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, 10, (sockaddr*)&sin, sizeof(sin)); if (base) event_base_dispatch(base); if (ev) evconnlistener_free(ev); if (base) event_base_free(base); #ifdef _WIN32 WSACleanup(); #endif return 0; };
|
XFtpFactory提供一个创建任务的函数CreateTask
,Dispatch
用于线程池中分配任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #pragma once #include "XTask.h" class XFtpFactory{ public: static XFtpFactory* Get() { static XFtpFactory f; return &f; }; XTask* CreateTask(); private: XFtpFactory(); };
#include "XFtpFactory.h" #include "XFtpServerCMD.h" #include "XFtpUSER.h" #include "XFtpLIST.h" #include "XFtpPORT.h" #include "XFtpRETR.h" #include "XFtpSTOR.h" XTask* XFtpFactory::CreateTask() { XFtpServerCMD* x = new XFtpServerCMD(); x->Reg("USER", new XFtpUSER()); XFtpLIST* list = new XFtpLIST(); x->Reg("PWD", list); x->Reg("LIST", list); x->Reg("CWD", list); x->Reg("CDUP", list); x->Reg("PORT", new XFtpPORT()); x->Reg("RETR", new XFtpRETR()); x->Reg("RETR", new XFtpSTOR()); return x; }; XFtpFactory::XFtpFactory() {};
|
XFtpServerCMD类是命令处理器,在XFtpFactory的CreatTask
函数中实例化了命令处理器并向其添加命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| #pragma once #include "XFtpTask.h" #include <map> class XFtpServerCMD : public XFtpTask{ public: virtual bool Init(); virtual void Read(struct bufferevent* bev); virtual void Event(struct bufferevent* bev, short what); void Reg(std::string s, XFtpTask* call); ~XFtpServerCMD(); private: std::map<std::string, XFtpTask*>calls; std::map<XFtpTask*, int>calls_del; };
#include "XFtpServerCMD.h" #include <event2/bufferevent.h> #include <event2/event.h> #include<iostream> #include<string.h> using namespace std; void XFtpServerCMD::Reg(std::string cmd, XFtpTask* call) { if (!call) { cout << "XFtpServerCMD::Reg call is null" << endl; return; }; if (cmd.empty()) { cout << "XFtpServerCMD::Reg cmd is null" << endl; return; }; if (calls.find(cmd) != calls.end()) { cout << cmd << " is already register" << endl; return; }; calls[cmd] = call; calls_del[call] = 0; return; }; void XFtpServerCMD::Read(struct bufferevent* bev) { char data[1024] = { 0 }; for (;;) { int len = bufferevent_read(bev, data, sizeof(data) - 1); if (len <= 0)break; data[len] = '\0'; cout << "Recv CMD:" << data << flush; string type = ""; for (int i = 0; i < len; i++) { if (data[i] == ' ' || data[i] == '\r') break; type += data[i]; }; cout << "type is [" << type << "]" << endl; if (calls.find(type) != calls.end()) { XFtpTask* t = calls[type]; t->ip = ip; t->port = port; t->base = base; t->cmdTask = this; t->Parse(type, data); if (type == "PORT") { ip = t->ip; port = t->port; }; } else { string msg = "200 OK\r\n"; bufferevent_write(bev, msg.c_str(), msg.size()); }; }; return; }; void XFtpServerCMD::Event(struct bufferevent* bev, short what) { if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl; delete this; }; return; }; bool XFtpServerCMD::Init() { cout << "XFtpServerCMD::Init" << endl; bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE); if (!bev) { delete this; return false; }; this->bev = bev; this->SetCallback(bev); timeval rt = { 60,0 }; bufferevent_set_timeouts(bev, &rt, 0); string msg = "220 thist is libevent Ftp_Server, Welcome!\r\n"; bufferevent_write(bev, msg.c_str(), msg.size()); return true; }; XFtpServerCMD::~XFtpServerCMD() { Close(); for (auto ptr = calls_del.begin(); ptr != calls_del.end(); ptr++) { ptr->first->Close(); delete ptr->first; }; };
|
XFtpUSER类用于实现USER命令,可根据需要实现具体登录认证,但这里不实现,默认都可以登录。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| #pragma once #include "XFtpTask.h" class XFtpUSER : public XFtpTask { public: virtual void Parse(std::string type, std::string msg); };
#include "XFtpUSER.h" #include<iostream> using namespace std; void XFtpUSER::Parse(std::string type, std::string msg) { cout << "XFtpUSER::Parse " << type << " " << msg << endl; ResCMD("230 Login successful \r\n"); return; };
|
XFtpPORT类用于实现PORT命令,解析IP地址和端口号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #pragma once #include "XFtpTask.h" class XFtpPORT :public XFtpTask { public: void Parse(std::string type, std::string msg); };
#include "XFtpPORT.h" #include <vector> #include <iostream> using namespace std; void XFtpPORT::Parse(std::string type, std::string msg) { vector<string> vals; string tmp = ""; for (int i = 5; i < msg.size(); i++) { if (msg[i] == ',' || msg[i] == '\r') { vals.push_back(tmp); tmp = ""; continue; }; tmp += msg[i]; }; if (vals.size() != 6) { ResCMD("501 Syntax error in parameters or arguments."); return; }; ip = vals[0] + "." + vals[1] + "." + vals[2] + "." + vals[3]; port = atoi(vals[4].c_str()) * 256 + atoi(vals[5].c_str()); cout << "Port ip is " << ip << endl; cout << "PORT port is " << port << endl; ResCMD("200 PORT command successful.\r\n"); return; };
|
后续命令用XFtpLIST列表类实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| #pragma once #include "XFtpTask.h" class XFtpLIST : public XFtpTask { public: virtual void Parse(std::string type, std::string msg); virtual void Write(struct bufferevent* bev); virtual void Event(struct bufferevent* bev, short what); private: std::string GetListData(std::string path); };
#include "XFtpLIST.h" #include<iostream> #include<event2/bufferevent.h> #include<event.h> #ifdef _WIN32 #include<io.h> #endif using namespace std; void XFtpLIST::Write(struct bufferevent* bev) { ResCMD("226 Transfer complete\r\n"); Close(); return; }; void XFtpLIST::Event(struct bufferevent* bev, short what) { if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl; Close(); } else if (what & BEV_EVENT_CONNECTED) cout << "XFtpLIST BEV_EVENT_CONNECTED" << endl; return; }; void XFtpLIST::Parse(std::string type, std::string msg) { string resmsg = ""; if (type == "PWD") { resmsg = "257 \""; resmsg += cmdTask->curDir; resmsg += "\" is current dir.\r\n"; ResCMD(resmsg); } else if (type == "LIST") { ConnectPORT(); ResCMD("150 Here comes thre directory listing.\r\n"); string listdata = GetListData(cmdTask->rootDir + cmdTask->curDir); Send(listdata); } else if (type == "CWD") { int pos = msg.rfind(" ") + 1; string path = msg.substr(pos, msg.size() - pos - 2); if (path[0] == '/') cmdTask->curDir = path; else { if (cmdTask->curDir[cmdTask->curDir.size() - 1] != '/') cmdTask->curDir += '/'; cmdTask->curDir += path + "/"; }; ResCMD("250 Directory success changed.\r\n"); } else if (type == "CDUP") { string path = cmdTask->curDir;
if (path[path.size() - 1] == '/') path = path.substr(0, path.size() - 1); int pos = path.rfind("/"); path = path.substr(0, pos); cmdTask->curDir = path; ResCMD("250 Directory success changed.\r\n"); }; return; }; string XFtpLIST::GetListData(std::string path) { string data = ""; #ifdef _WIN32 _finddata_t file; path += "/*.*"; intptr_t dir = _findfirst(path.c_str(), &file); if (dir < 0) return data; do { string tmp = ""; if (file.attrib & _A_SUBDIR) { if (strcmp(file.name, ".") == 0 || strcmp(file.name, "..") == 0) continue; tmp = "drwxrwxrwx 1 root group"; } else tmp = "-rwxrwxrwx 1 root group"; char buf[1024]; sprintf(buf, "%u", file.size); tmp += buf; strftime(buf, sizeof(buf) - 1, "%b %d %H:%M ", localtime(&file.time_write)); tmp += buf; tmp += file.name; data += "\r\n"; data += tmp; } while (_findnext(dir, &file) == 0); #else string cmd = "ls -l "; cmd += path; cout << "Popen:" << cmd << endl; FILE* f = popen(cmd.c_str(), "r"); if (!f) return data; char buffer[1024] = { 0 }; for (;;) { int len = fread(buffer, 1, sizeof(buffer) - 1, f); if (len <= 0)break; buffer[len] = '\0'; data += buffer; }; pclose(f); #endif return data; };
|
XFtpRETR类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #pragma once #include "XFtpTask.h" class XFtpRETR :public XFtpTask { public: virtual void Parse(std::string type, std::string msg); virtual void Write(struct bufferevent* bev); virtual void Event(struct bufferevent* bev, short what); private: char buf[1024]; };
#include "XFtpRETR.h" #include <event2/event.h> #include <event2/bufferevent.h> #include<iostream> using namespace std; void XFtpRETR::Write(struct bufferevent* bev) { if (!fp) return; int len = fread(buf, 1, sizeof(buf), fp); if (len <= 0) { ResCMD("226 Transfer complete\r\n"); Close(); return; }; cout << "[" << len << "]" << flush; Send(buf, len); return; }; void XFtpRETR::Event(struct bufferevent* bev, short what) { if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl; Close(); } else if (what & BEV_EVENT_CONNECTED) cout << "XFtpLIST BEV_EVENT_CONNECTED" << endl; return; }; void XFtpRETR::Parse(std::string type, std::string msg) { int pos = msg.rfind(" ") + 1; string filename = msg.substr(pos, msg.size() - pos - 2); string path = cmdTask->rootDir; path += cmdTask->curDir; path += "/"; path += filename; fp = fopen(path.c_str(), "rb"); if (fp) { ConnectPORT(); ResCMD("150 File OK\r\n"); bufferevent_trigger(bev, EV_WRITE, 0); } else ResCMD("450 file open failed!\r\n"); return; };
|
XFtpSTOR类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| #pragma once #include "XFtpTask.h" class XFtpSTOR :public XFtpTask { public: virtual void Parse(std::string type, std::string msg); virtual void Read(struct bufferevent* bev); virtual void Event(struct bufferevent* bev, short what); private: char buf[1024]; };
#include "XFtpSTOR.h" #include<iostream> #include<event2/event.h> #include<event2/bufferevent.h> using namespace std; void XFtpSTOR::Parse(std::string type, std::string msg) { int pos = msg.rfind(" ") + 1; string filename = msg.substr(pos, msg.size() - pos - 2); string path = cmdTask->rootDir; path += cmdTask->curDir; path += filename; fp = fopen(path.c_str(), "wb"); if (fp) { ConnectPORT(); ResCMD("125 File OK\r\n"); bufferevent_trigger(bev, EV_READ, 0); } else ResCMD("450 file open failed!\r\n"); return; }; void XFtpSTOR::Read(struct bufferevent* bev) { if (!fp) return; for (;;) { int len = bufferevent_read(bev, buf, sizeof(buf)); if (len <= 0) return; int size = fwrite(buf, 1, len, fp); cout << "<" << len << ":" << size << ">" << flush; }; return; }; void XFtpSTOR::Event(struct bufferevent* bev, short what) { if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl; Close(); ResCMD("226 Transfer complete\r\n"); } else if (what & BEV_EVENT_CONNECTED) cout << "XFtpLIST BEV_EVENT_CONNECTED" << endl; return; };
|
XFtpTask类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| #pragma once #include "XTask.h" #include <string> class XFtpTask : public XTask { public: std::string curDir = "/"; std::string rootDir = "."; std::string ip = ""; int port = 0; XFtpTask* cmdTask = 0; virtual void Parse(std::string type, std::string msg) {} void ResCMD(std::string msg); void Send(std::string data); void Send(const char* data, int datasize); void ConnectPORT(); void Close(); virtual void Read(struct bufferevent* bev) {} virtual void Write(struct bufferevent* bev) {} virtual void Event(struct bufferevent* bev, short what) {} void SetCallback(struct bufferevent* bev); bool Init() { return true; } protected: static void ReadCB(bufferevent* bev, void* arg); static void WriteCB(bufferevent* bev, void* arg); static void EventCB(struct bufferevent* bev, short what, void* arg); struct bufferevent* bev = 0; FILE* fp = 0; };
#include "XFtpTask.h" #include <event2/event.h> #include <event2/bufferevent.h> #include <iostream> #include <cstring> using namespace std; void XFtpTask::Send(std::string data) { Send(data.c_str(), data.size()); return; }; void XFtpTask::Send(const char* data, int datasize) { if (!bev) return; bufferevent_write(bev, data, datasize); return; }; void XFtpTask::Close() { if (bev) { bufferevent_free(bev); bev = 0; }; if (fp) { fclose(fp); fp = 0; }; return; }; void XFtpTask::ConnectPORT() { if (ip.empty() || port <= 0 || !base) { cout << "ConnectPORT failed ip or port is null" << endl; return; }; bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); evutil_inet_pton(AF_INET, ip.c_str(), &sin.sin_addr.s_addr); SetCallback(bev); timeval rt = { 60,0 }; bufferevent_set_timeouts(bev, &rt, 0); bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin)); return; }; void XFtpTask::ResCMD(string msg) { if (!cmdTask || !cmdTask->bev) return; cout << "ResCMD:" << msg << endl; if (msg[msg.size() - 1] != '\n') msg += "\r\n"; bufferevent_write(cmdTask->bev, msg.c_str(), msg.size()); return; }; void XFtpTask::SetCallback(struct bufferevent* bev) { bufferevent_setcb(bev, ReadCB, WriteCB, EventCB, this); bufferevent_enable(bev, EV_READ | EV_WRITE); return; }; void XFtpTask::ReadCB(bufferevent* bev, void* arg) { XFtpTask* t = static_cast<XFtpTask*>(arg); t->Read(bev); return; }; void XFtpTask::WriteCB(bufferevent* bev, void* arg) { XFtpTask* t = static_cast<XFtpTask*>(arg); t->Write(bev); return; }; void XFtpTask::EventCB(struct bufferevent* bev, short what, void* arg) { XFtpTask* t = static_cast<XFtpTask*>(arg); t->Event(bev, what); return; };
|
XTask类如下:
1 2 3 4 5 6 7 8 9 10 11 12
| #pragma once class XTask { public: struct event_base* base = 0; int sock; int thread_id = 0; virtual bool Init() = 0; };
#include "XTask.h"
|
XThread类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| #pragma once #include <event2/event.h> #include <list> #include <mutex> #include "XTask.h" class XThread { public: void Start(); void Main(); bool Setup(); void Notify(evutil_socket_t fd, short which); void Activate(); void AddTask(XTask* t); XThread(); ~XThread(); int id = 0; private: int notify_send_fd = 0; struct event_base* base = 0; std::list<XTask*> tasks; std::mutex tasks_mutex; };
#include "XThread.h" #include "XTask.h" #include <thread> #include <iostream> #include <event2/event.h> #ifdef _WIN32 #else #include <unistd.h> #endif using namespace std; static void NotifyCallback(evutil_socket_t fd, short which, void* arg) { XThread* t = (XThread*)arg; t->Notify(fd, which); return; }; void XThread::Notify(evutil_socket_t fd, short which) { char buf[2] = { 0 }; #ifdef _WIN32 int re = recv(fd, buf, 1, 0); #else int re = read(fd, buf, 1); #endif if (re <= 0) return; cout << id << " thread " << buf << endl; XTask* task = nullptr; tasks_mutex.lock(); if (tasks.empty()) { tasks_mutex.unlock(); return; }; task = tasks.front(); tasks.pop_front(); tasks_mutex.unlock(); task->Init(); return; }; void XThread::AddTask(XTask* t) { if (!t) return; t->base = this->base; tasks_mutex.lock(); tasks.push_back(t); tasks_mutex.unlock(); return; }; void XThread::Activate() { #ifdef _WIN32 int re = send(this->notify_send_fd, "c", 1, 0); #else int re = write(this->notify_send_fd, "c", 1); #endif if (re <= 0) cerr << "XThread::Activate() failed!" << endl; return; }; void XThread::Start() { Setup(); thread th(&XThread::Main, this); th.detach(); return; }; bool XThread::Setup() { #ifdef _WIN32 evutil_socket_t fds[2]; if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) { cout << "evutil_socketpair failed!" << endl; return false; }; evutil_make_socket_nonblocking(fds[0]); evutil_make_socket_nonblocking(fds[1]); #else int fds[2]; if (pipe(fds)) { cerr << "pipe failed!" << endl; return false; }; #endif notify_send_fd = fds[1]; event_config* ev_conf = event_config_new(); event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK); this->base = event_base_new_with_config(ev_conf); event_config_free(ev_conf); if (!base) { cerr << "event_base_new_with_config failed in thread!" << endl; return false; }; event* ev = event_new(base, fds[0], EV_READ | EV_PERSIST, NotifyCallback, this); event_add(ev, 0); return true; }; void XThread::Main() { cout << id << " XThread::Main() begin" << endl; event_base_dispatch(base); event_base_free(base); cout << id << " XThread::Main() end" << endl; return; }; XThread::XThread() {}; XThread::~XThread() {};
|
XThreadPool类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #pragma once #include<vector> #include "XTask.h" class XThread; class XThreadPool { public: static XThreadPool* Get() { static XThreadPool p; return &p; }; void Init(int threadCount); void Dispatch(XTask* task); private: int threadCount = 0; int lastThread = -1; std::vector<XThread*> threads; XThreadPool() {}; };
#include "XThreadPool.h" #include "XThread.h" #include <thread> #include <iostream> using namespace std; void XThreadPool::Dispatch(XTask* task) { if (!task) return; int tid = (lastThread + 1) % threadCount; lastThread = tid; XThread* t = threads[tid]; t->AddTask(task); t->Activate(); return; }; void XThreadPool::Init(int threadCount) { this->threadCount = threadCount; this->lastThread = -1; for (int i = 0; i < threadCount; i++) { XThread* t = new XThread(); t->id = i + 1; std::cout << "Create thread " << t->id << endl; t->Start(); this->threads.push_back(t); this_thread::sleep_for(10ms); }; return; };
|
Makefile如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| GCC ?= g++ CCMODE = PROGRAM INCLUDES = -I/opt/libevent/include/ CFLAGS = -Wall $(MACRO) TARGET = ftpSrv SRCS := $(wildcard *.cpp) LIBS = -L /opt/libevent/lib/ -levent -lpthread ifeq ($(CCMODE),PROGRAM) $(TARGET): $(LINKS) $(SRCS) $(GCC) $(CFLAGS) $(INCLUDES) -o $(TARGET) $(SRCS) $(LIBS) @chmod +x $(TARGET) @echo make $(TARGET) ok. clean: rm -rf $(TARGET) endif clean: rm -f $(TARGET) .PHONY:install .PHONY:clean
|