C++后端开发入门-FTP服务器与Libevent初探

环境准备

安装Libevent:

bash
1
sudo apt install libevent-dev

安装后需要在Visual Studio 2022的“跨平台”下的远程标头IntelliSense管理器中将目标主机更新,并在项目属性链接器的“库依赖项”中添加“event”表示链接libevent.so,换作命令行为:

bash
1
gcc test.c -o test -I /usr/include -levent

Libevent入门

event_init初始化Libevent:

c++
1
2
3
#include <event2/event-config.h>
#include <event.h>
struct event_base *event_init(void);

event_set初始化event事件,设置回调函数和关注的事件:

c++
1
2
3
4
5
6
7
8
9
#define evutil_socket_t int
void event_set(
struct event *ev, //执行要初始化的event对象
evutil_socket_t fd, //该event绑定的句柄
short event, //该fd上关注的事件类型 如EV_READ EV_WRITE EV_SIGNAL等
void (*cb)(evutil_socket_t, short, void *), //回调函数 参数依次为fd event arg
void *arg //传递给回调函数的参数
);
#define evtimer_set(ev, cb, arg) event_set((ev), -1, 0, (cb), (arg))

event_base_set设置event要注册到哪个event_base实例上。Libevent不管理event事件集合,要应用程序自行管理。

c++
1
2
3
4
int event_base_set(
struct event_base *,
struct event *
);

event_add添加事件。对于定时事件,Libevent用一个小根堆维护,键为超时事件。对于信号和I/O事件,Libevent将其放到等待链表中,后者是个双向链表。

c++
1
2
3
4
int event_add(
struct event *ev,
const struct timeval *timeout //定时值
);

event_dispatch进入无限循环,等待就绪事件并执行回调函数:

c++
1
2
3
int event_base_dispatch(
struct event_base *
);

event_new创建事件对象:

c++
1
2
3
4
5
6
7
struct event *event_new(
struct event_base *, //关联的event_base
evutil_socket_t, //监控的文件描述符 超时事件设为-1
short, //事件类型标志
event_callback_fn, //事件触发时的回调函数
void * //传给回调函数的参数 如EV_READ监控读事件 EV_PERSIST持久化事件(触发后不自动删除)
);

event_config_new创建一个event_config对象,用event_config_set_flag设置event_base运行时特性,用event_config_free释放一个event_config对象:

c++
1
2
3
4
5
6
7
8
struct event_config *event_config_new(void);
int event_config_set_flag(
struct event_config *cfg, //event_config
int flag //标志 如EVENT_BASE_FLAG_NOLOCK禁用线程锁
);
void event_config_free(
struct event_config *cfg
);

event_base_new_with_config创建自定义配置event_base。用event_base_dispatch启动事件循环。用event_base_free释放event_base:

c++
1
2
3
4
5
6
7
8
9
struct event_base *event_base_new_with_config(
const struct event_config *
);
int event_base_dispatch(
struct event_base *
);
void event_base_free(
struct event_base *
);

evconnlistener_new_bind创建监听事件,并监听给定地址上的TCP连接。用evconnlistener_free释放一个监听器对象。

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
#include <event2/listener.h>
struct evconnlistener *evconnlistener_new_bind(
struct event_base *base, //Libevent上下文
evconnlistener_cb cb, //回调函数
void *ptr, //回调函数参数
unsigned flags, //监听器属性 如LEV_OPT_REUSEABLE端口复用 LEV_OPT_CLOSE_ON_FREE释放资源时关闭套接字
int backlog, //未连接队列长度 -1默认
const struct sockaddr *sa, //绑定的地址端口
int socklen //套接字地址结构字节长度
);
void evconnlistener_free(
struct evconnlistener *lev
);

bufferevent_socket_new创建一个基于套接字的bufferevent,将指定文件描述符封装为带有读写缓冲区的高级事件对象:

c++
1
2
3
4
5
6
#include <event2/bufferevent.h>
struct bufferevent *bufferevent_socket_new(
struct event_base *base, //关联的event_base
evutil_socket_t fd, //需监听的套接字文件描述符
int options //选项
);

bufferevent_set_timeouts设置bufferevent读写超时时间,超时后触发相应超时时间:

c++
1
2
3
4
5
int bufferevent_set_timeouts(
struct bufferevent *bufev, //bufferevent
const struct timeval *timeout_read, //读超时时间 NULL不设置
const struct timeval *timeout_write //写超时时间 NULL不设置
);

bufferevent_socket_new建立异步非阻塞TCP连接:

c++
1
2
3
4
5
int bufferevent_socket_connect(
struct bufferevent *, //bufferevent对象
const struct sockaddr *, //目标地址结构体
int //地址结构体长度
);

bufferevent_setcb设置bufferevent的回调函数:

c++
1
2
3
4
5
6
7
void bufferevent_setcb(
struct bufferevent *bufev, //bufferevent
bufferevent_data_cb readcb, //数据可读时调用的回调函数
bufferevent_data_cb writecb, //数据可写时调用的回调函数
bufferevent_event_cb eventcb, //发生错误时调用的回调函数
void *cbarg //回调函数上下文
);

分别用bufferevent_readbufferevent_write从bufferevent输入/输出缓冲区中读取/写入数据:

c++
1
2
3
4
5
6
7
8
9
10
size_t bufferevent_read(
struct bufferevent* bufev, //bufferevent
void* data, //读到该缓冲区
size_t size //最多size字节
); //成功返回实际读取字节数 失败-1
int bufferevent_write(
struct bufferevent* bufev,
const void* data,
size_t size
);

bufferevent_trigger可手动触发某bufferevent回调函数:

c++
1
2
3
4
5
void bufferevent_trigger(
struct bufferevent* bufev, //bufferevent
short iotype, //要触发的回调事件类型
int options //传递给回调函数的参数
);

evutil_inet_pton将点分十进制字符串转为二进制IP地址,是inet_pton的封装:

c++
1
2
3
4
5
int evutil_inet_pton(
int af, //地址族 AF_INET或AF_INET6等
const char *src, //字符串
void *dst //二进制地址 in_addr或in6_addr型
); //成功1 src不包含有效地址返回0 af不支持返回-1并设置errno

evutil_socketpair在本地创建一对全双工的用于进程间通信的套接字:

c++
1
2
3
4
5
6
int evutil_socketpair(
int d, //地址族
int type, //套接字类型
int protocol, //协议 默认0
evutil_socket_t sv[2] //返回的套接字对 分别读和写
);

evutil_make_socket_nonblocking将一个套接字设为非阻塞模式:

c++
1
2
3
int evutil_make_socket_nonblocking(
evutil_socket_t sock
);

示例:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <sys/types.h>
#include <event2/event-config.h>
#include <stdio.h>
#include <event.h>
struct event ev;
struct timeval tv;
void time_cb(int fd, short event, void* argc) {
printf("timer wakeup!\n");
event_add(&ev, &tv);
return;
};
int main() {
struct event_base* base = event_init();
tv.tv_sec = 10;
tv.tv_usec = 0;
evtimer_set(&ev, time_cb, NULL);
event_base_set(base, &ev);
event_add(&ev, &tv);
event_base_dispatch(base);
return;
};

FTP协议入门

FTP工作方式分为主动方式PORT和被动方式PASV,主动方式可避免服务端防火墙干扰,被动方式可避免客户端防火墙干扰。

主动模式时客户端开启一个大于1024的随机端口,与服务器21号端口建立TCP控制连接。当用户需要传输数据时,客户端在控制通道中用PORT命令向服务器发送本地IP地址和端口号,服务器20号端口主动连接客户端发来的指定端口,在这条数据连接上进行文件下载上传。

被动模式时客户端开启一个大于1024的随机端口,与服务器21号端口建立TCP控制连接。当用户需要传输数据时,客户端向服务器发送PASV命令通知服务器采用被动传输方式。服务器收到PASV命令后开启一个大于1024的随机端口,并将IP地址和该端口号通过控制连接发送给客户端,客户端与服务器该端口建立TCP数据连接,进行文件下载上传。

FTP命令根据功能不同分为访问控制命令、传输参数命令和FTP服务命令,都是以网络虚拟终端NVT的ASCI文本形式发送,以ASCII回车或换行符结束。

常见控制命令如下:

控制命令 功能
USER username 登录用户,username是登录用户名
CWD pathname 改变工作路径,pathname是指定目录路径名
CDUP 回到上一层目录

常用传输参数命令如下:

传输参数命令 功能
PORT h1,h2,h3,h4,p1,p2 主动传输方式,h1~h4为IP地址,p1*256+p2为端口号
PASV 被动传输方式

常用服务命令如下:

服务命令 功能
LIST pathname 请求服务器发送列表信息,如指定目录文件列表
RETR pathname 请求服务器向客户端发送指定文件
STOR pathname 客户端向服务器上传指定文件,已存在则替换,不存在则新建
PWD 返回当前工作目录名

每条FTP指令至少产生一个FTP回应,一个FTP回应包含一个FTP应答码和一段文本说明,文本说明由服务器随便设定。FTP应答码由3位数字组成,第一位表示相应是成功/失败/不完全的,第二位表示该相应针对哪部分,第三位为附加信息。

常见FTP应答码第一位含义如下:

应答码第一位 含义
1 确定预备应答。操作目前为止正常但尚未完成。
2 确定完成应答。操作成功完成。
4 暂时拒绝完成应答。操作执行失败,未接受命令,可稍后继续发送命令。
5 永久拒绝完成应答。命令不被接受且不再重试。

常见FTP应答码第二位含义如下:

应答码第二位 含义
0 格式错误
2 控制或数据连接
3 认证和账户登录过程
5 文件系统状态

常用FTP应答码例子如下:

应答码例子 含义
200 指令成功
501 参数或变量语法错误
226 关闭数据连接,请求文件操作成功
150 文件状态良好,打开数据连接

FTP服务器原理

本小节用的这个项目https://github.com/BigCJL/libevent-Ftp_Server 。服务端启动后Windows客户端资源管理器中打开报错,不知道为啥,反正命令行ftp工具能连上,一些手写的Windows客户端也能连上,据说Filezilla等也能连上。

主程序Ftp_Server.cpp内容如下,创建监听事件:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <iostream>
#include "XThreadPool.h"
#include "XFtpServerCMD.h"
#include <event2/listener.h>
#include <event2/event.h>
#ifndef _WIN32
#include <signal.h>
#include <string.h>
#endif
#include "XFtpFactory.h"
using namespace std;
#define SPORT 21
void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg) {
cout << "listen_cb" << endl;
XTask* task = XFtpFactory::Get()->CreateTask();
task->sock = s;
XThreadPool::Get()->Dispatch(task);
return;
};
int main() {
#ifdef _WIN32
//初始化socket库,这是windows特有的操作
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
#else
//忽略管道信号 发送数据给已关闭的socket
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
#endif
XThreadPool::Get()->Init(10); //1. 初始化线程池
std::cout << "test_thread_pool_server\n";
event_base* base = event_base_new(); //创建libevent的上下文
if (base)
std::cout << "event_base_new success!" << std::endl;
sockaddr_in sin; //监听端口 socket,bind,listen 三步全部包含在内了
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evconnlistener* ev = evconnlistener_new_bind(base, listen_cb, base, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, 10, (sockaddr*)&sin, sizeof(sin));
if (base) //事件分发处理
event_base_dispatch(base);
if (ev)
evconnlistener_free(ev);
if (base)
event_base_free(base);
#ifdef _WIN32
WSACleanup();
#endif
return 0;
};

XFtpFactory提供一个创建任务的函数CreateTaskDispatch用于线程池中分配任务:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//XFtpFactory.h
#pragma once
#include "XTask.h"
class XFtpFactory{
public:
static XFtpFactory* Get() {
static XFtpFactory f;
return &f;
};
XTask* CreateTask();
private:
XFtpFactory();
};

//XFtpFactory.cpp
#include "XFtpFactory.h"
#include "XFtpServerCMD.h"
#include "XFtpUSER.h"
#include "XFtpLIST.h"
#include "XFtpPORT.h"
#include "XFtpRETR.h"
#include "XFtpSTOR.h"
XTask* XFtpFactory::CreateTask() {
XFtpServerCMD* x = new XFtpServerCMD();
x->Reg("USER", new XFtpUSER()); //注册ftp消息处理对象
XFtpLIST* list = new XFtpLIST();
x->Reg("PWD", list);
x->Reg("LIST", list);
x->Reg("CWD", list);
x->Reg("CDUP", list);
x->Reg("PORT", new XFtpPORT());
x->Reg("RETR", new XFtpRETR());
x->Reg("RETR", new XFtpSTOR());
return x;
};
XFtpFactory::XFtpFactory() {};

XFtpServerCMD类是命令处理器,在XFtpFactory的CreatTask函数中实例化了命令处理器并向其添加命令。

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//XFtpServerCMD.h
#pragma once
#include "XFtpTask.h"
#include <map>
class XFtpServerCMD : public XFtpTask{
public:
virtual bool Init(); //初始化任务
virtual void Read(struct bufferevent* bev);
virtual void Event(struct bufferevent* bev, short what);
void Reg(std::string s, XFtpTask* call); //注册命令处理对象(开销要不要考虑?线程互斥要不要考虑?),注册时 还未分发到线程(Dispatch)
~XFtpServerCMD();
private:
std::map<std::string, XFtpTask*>calls; //注册的处理对象
std::map<XFtpTask*, int>calls_del; //用来做空间清理
};

//XFtpServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/bufferevent.h>
#include <event2/event.h>
#include<iostream>
#include<string.h>
using namespace std;
void XFtpServerCMD::Reg(std::string cmd, XFtpTask* call) {
if (!call) {
cout << "XFtpServerCMD::Reg call is null" << endl;
return;
};
if (cmd.empty()) {
cout << "XFtpServerCMD::Reg cmd is null" << endl;
return;
};
if (calls.find(cmd) != calls.end()) { //已经注册的是否覆盖? ->不覆盖,提示错误
cout << cmd << " is already register" << endl;
return;
};
calls[cmd] = call;
calls_del[call] = 0; //用来做空间清理
return;
};
void XFtpServerCMD::Read(struct bufferevent* bev) { //子线程XThread event事件分发
char data[1024] = { 0 };
for (;;) {
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (len <= 0)break;
data[len] = '\0';
cout << "Recv CMD:" << data << flush;
string type = ""; //分发到处理对象 分析出类型 USER anonymous
for (int i = 0; i < len; i++) {
if (data[i] == ' ' || data[i] == '\r')
break;
type += data[i];
};
cout << "type is [" << type << "]" << endl;
if (calls.find(type) != calls.end()) {
XFtpTask* t = calls[type];
t->ip = ip;
t->port = port;
t->base = base;
t->cmdTask = this; //用来处理回复命令和目录
t->Parse(type, data);
if (type == "PORT") { //这样做代码耦合度变高,暂时按这个方案
ip = t->ip;
port = t->port;
};
}
else {
string msg = "200 OK\r\n"; //如果没找到消息命令的类型,回复一个OK
bufferevent_write(bev, msg.c_str(), msg.size());
};
};
return;
};
void XFtpServerCMD::Event(struct bufferevent* bev, short what) {
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { //如果对方网络断掉或者机器死机,有可能收不到BEV_EVENT_EOF的数据
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl;
delete this;
};
return;
};
bool XFtpServerCMD::Init() {//初始化任务 运行在子线程中
cout << "XFtpServerCMD::Init" << endl;
bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE); //监听socket bufferevent base socket base在子线程里的Setup初始化了
if (!bev) {
delete this;
return false;
};
this->bev = bev;
this->SetCallback(bev);
timeval rt = { 60,0 }; //添加超时
bufferevent_set_timeouts(bev, &rt, 0);
string msg = "220 thist is libevent Ftp_Server, Welcome!\r\n"; //发送欢迎消息 这里说明建立了一个连接
bufferevent_write(bev, msg.c_str(), msg.size());
return true;
};
XFtpServerCMD::~XFtpServerCMD() {
Close();
for (auto ptr = calls_del.begin(); ptr != calls_del.end(); ptr++) {
ptr->first->Close();
delete ptr->first;
};
};

XFtpUSER类用于实现USER命令,可根据需要实现具体登录认证,但这里不实现,默认都可以登录。

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//XFtpUSER.h
#pragma once
#include "XFtpTask.h"
class XFtpUSER : public XFtpTask {
public:
virtual void Parse(std::string type, std::string msg);
};

//XFtpUSER.cpp
#include "XFtpUSER.h"
#include<iostream>
using namespace std;
void XFtpUSER::Parse(std::string type, std::string msg) { //解析协议
cout << "XFtpUSER::Parse " << type << " " << msg << endl;
ResCMD("230 Login successful \r\n");
return;
};

XFtpPORT类用于实现PORT命令,解析IP地址和端口号:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//XFtpPORT.h
#pragma once
#include "XFtpTask.h"
class XFtpPORT :public XFtpTask {
public:
void Parse(std::string type, std::string msg); //解析协议
};

//XFtpPORT.cpp
#include "XFtpPORT.h"
#include <vector>
#include <iostream>
using namespace std;
void XFtpPORT::Parse(std::string type, std::string msg) {
//PORT 127,0,0,1,70,96\r\n
//PORT n1.n2.n3.n4,n5,n6\r\n
//port = n5*256 + n6
vector<string> vals; //只获取ip和端口,不连接 取出ip
string tmp = "";
for (int i = 5; i < msg.size(); i++) {
if (msg[i] == ',' || msg[i] == '\r') {
vals.push_back(tmp);
tmp = "";
continue;
};
tmp += msg[i];
};
if (vals.size() != 6) {
ResCMD("501 Syntax error in parameters or arguments."); //PORT格式有误
return;
};
ip = vals[0] + "." + vals[1] + "." + vals[2] + "." + vals[3];
port = atoi(vals[4].c_str()) * 256 + atoi(vals[5].c_str()); //port = n5*256 + n6
cout << "Port ip is " << ip << endl;
cout << "PORT port is " << port << endl;
ResCMD("200 PORT command successful.\r\n");
return;
};

后续命令用XFtpLIST列表类实现:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//XFtpLIST.h
#pragma once
#include "XFtpTask.h"
class XFtpLIST : public XFtpTask {
public:
virtual void Parse(std::string type, std::string msg);
virtual void Write(struct bufferevent* bev);
virtual void Event(struct bufferevent* bev, short what);
private:
std::string GetListData(std::string path);
};

//XFtpLIST.cpp
#include "XFtpLIST.h"
#include<iostream>
#include<event2/bufferevent.h>
#include<event.h>
#ifdef _WIN32
#include<io.h> //用于遍历当前目录
#endif
using namespace std;
void XFtpLIST::Write(struct bufferevent* bev) {
ResCMD("226 Transfer complete\r\n"); //4 226 Transfer complete 发送完成
Close(); //5 关闭连接
return;
};
void XFtpLIST::Event(struct bufferevent* bev, short what) {
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { //如果对方网络断掉或者机器死机,有可能收不到BEV_EVENT_EOF的数据
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl;
Close();
}
else
if (what & BEV_EVENT_CONNECTED)
cout << "XFtpLIST BEV_EVENT_CONNECTED" << endl;
return;
};
void XFtpLIST::Parse(std::string type, std::string msg) { // 解析协议
string resmsg = "";
if (type == "PWD") {
resmsg = "257 \""; //257 "/" is current directory
resmsg += cmdTask->curDir;
resmsg += "\" is current dir.\r\n";
ResCMD(resmsg);
}
else
if (type == "LIST") {
//1 连接数据通道 2 150发送成功 3 发送目录数据通道 4 发送完成226 5 关闭连接
//命令通道回复消息,使用数据通道发送目录
//-rwxrwxrwx 1 root group 64463 Mar 14 09:53 101.jgp\r\n
ConnectPORT(); //1 连接数据通道
ResCMD("150 Here comes thre directory listing.\r\n"); //2 150
string listdata = GetListData(cmdTask->rootDir + cmdTask->curDir); //string listdata = "-rwxrwxrwx 1 root group 64463 Mar 14 09:53 101.jgp\r\n";
Send(listdata); // 3 数据通道发送
}
else
if (type == "CWD") { //切换目录
int pos = msg.rfind(" ") + 1; //取出命令中的路径 CWD test\r\n
string path = msg.substr(pos, msg.size() - pos - 2); //去掉结尾的\r\n
if (path[0] == '/') //绝对路径
cmdTask->curDir = path;
else {
if (cmdTask->curDir[cmdTask->curDir.size() - 1] != '/')
cmdTask->curDir += '/';
cmdTask->curDir += path + "/";
};
ResCMD("250 Directory success changed.\r\n"); // /test/
//cmdTask->curDir +=
}
else
if (type == "CDUP") { //回到上层路径
string path = cmdTask->curDir;

if (path[path.size() - 1] == '/') //统一去掉结尾的 /
path = path.substr(0, path.size() - 1);
int pos = path.rfind("/");
path = path.substr(0, pos);
cmdTask->curDir = path;
ResCMD("250 Directory success changed.\r\n");
};
return;
};
string XFtpLIST::GetListData(std::string path) {
string data = ""; //"-rwxrwxrwx 1 root group 64463 Mar 14 09:53 101.jgp\r\n";
#ifdef _WIN32
_finddata_t file; //存储文件信息
path += "/*.*"; //目录上下文
intptr_t dir = _findfirst(path.c_str(), &file);
if (dir < 0)
return data;
do {
string tmp = "";
if (file.attrib & _A_SUBDIR) {//是否是目录,去掉. ..
if (strcmp(file.name, ".") == 0 || strcmp(file.name, "..") == 0)
continue;
tmp = "drwxrwxrwx 1 root group";
}
else
tmp = "-rwxrwxrwx 1 root group";
char buf[1024];
sprintf(buf, "%u", file.size);
tmp += buf;
strftime(buf, sizeof(buf) - 1, "%b %d %H:%M ", localtime(&file.time_write)); //日期时间
tmp += buf;
tmp += file.name;
data += "\r\n";
data += tmp;
} while (_findnext(dir, &file) == 0);
#else
string cmd = "ls -l ";
cmd += path;
cout << "Popen:" << cmd << endl;
FILE* f = popen(cmd.c_str(), "r");
if (!f)
return data;
char buffer[1024] = { 0 };
for (;;) {
int len = fread(buffer, 1, sizeof(buffer) - 1, f);
if (len <= 0)break;
buffer[len] = '\0';
data += buffer;
};
pclose(f);
#endif
return data;
};

XFtpRETR类如下:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//XFtpRETR.h
#pragma once
#include "XFtpTask.h"
class XFtpRETR :public XFtpTask {
public:
virtual void Parse(std::string type, std::string msg);
virtual void Write(struct bufferevent* bev);
virtual void Event(struct bufferevent* bev, short what);
private:
char buf[1024];
};

//XFtpRETR.cpp
#include "XFtpRETR.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include<iostream>
using namespace std;
void XFtpRETR::Write(struct bufferevent* bev) {
if (!fp)
return;
int len = fread(buf, 1, sizeof(buf), fp);
if (len <= 0) {
ResCMD("226 Transfer complete\r\n");
Close();
return;
};
cout << "[" << len << "]" << flush;
Send(buf, len);
return;
};
void XFtpRETR::Event(struct bufferevent* bev, short what) {
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) {//如果对方网络断掉或者机器死机,有可能收不到BEV_EVENT_EOF的数据
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl;
Close();
}
else
if (what & BEV_EVENT_CONNECTED)
cout << "XFtpLIST BEV_EVENT_CONNECTED" << endl;
return;
};
void XFtpRETR::Parse(std::string type, std::string msg) { //解析协议
int pos = msg.rfind(" ") + 1; //文件名
string filename = msg.substr(pos, msg.size() - pos - 2);
string path = cmdTask->rootDir;
path += cmdTask->curDir;
path += "/";
path += filename;
fp = fopen(path.c_str(), "rb");
if (fp) {
ConnectPORT(); //连接数据通道
ResCMD("150 File OK\r\n"); //发送开始下载文件的指令
bufferevent_trigger(bev, EV_WRITE, 0); //触发写入事件
}
else
ResCMD("450 file open failed!\r\n");
return;
};

XFtpSTOR类如下:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//XFtpSTOR.h
#pragma once
#include "XFtpTask.h"
class XFtpSTOR :public XFtpTask { //上传文件
public:
virtual void Parse(std::string type, std::string msg); //解析协议
virtual void Read(struct bufferevent* bev);
virtual void Event(struct bufferevent* bev, short what);
private:
char buf[1024];
};

//XFtpSTOR.cpp
#include "XFtpSTOR.h"
#include<iostream>
#include<event2/event.h>
#include<event2/bufferevent.h>
using namespace std;
void XFtpSTOR::Parse(std::string type, std::string msg) { //解析协议
int pos = msg.rfind(" ") + 1; //文件名
string filename = msg.substr(pos, msg.size() - pos - 2);
string path = cmdTask->rootDir;
path += cmdTask->curDir;
path += filename;
fp = fopen(path.c_str(), "wb"); //写入二进制
if (fp) {
ConnectPORT(); //连接数据通道
ResCMD("125 File OK\r\n"); //发送开始接收文件的指令
bufferevent_trigger(bev, EV_READ, 0); //触发读事件
}
else
ResCMD("450 file open failed!\r\n");
return;
};
void XFtpSTOR::Read(struct bufferevent* bev) {
if (!fp)
return;
for (;;) {
int len = bufferevent_read(bev, buf, sizeof(buf));
if (len <= 0)
return;
int size = fwrite(buf, 1, len, fp);
cout << "<" << len << ":" << size << ">" << flush;
};
return;
};
void XFtpSTOR::Event(struct bufferevent* bev, short what) {
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)) { //如果对方网络断掉或者机器死机,有可能收不到BEV_EVENT_EOF的数据
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR" << endl;
Close();
ResCMD("226 Transfer complete\r\n");
}
else
if (what & BEV_EVENT_CONNECTED)
cout << "XFtpLIST BEV_EVENT_CONNECTED" << endl;
return;
};

XFtpTask类如下:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//XFtpTask.h
#pragma once
#include "XTask.h"
#include <string>
class XFtpTask : public XTask {
public:
std::string curDir = "/";
std::string rootDir = ".";
std::string ip = ""; //PORT 数据通道的ip和端口
int port = 0;
XFtpTask* cmdTask = 0; //命令通道
virtual void Parse(std::string type, std::string msg) {} //解析协议
void ResCMD(std::string msg); //回复cmd消息
void Send(std::string data); //用来发送建立了连接的数据通道
void Send(const char* data, int datasize);
void ConnectPORT(); //连接数据通道
void Close();
virtual void Read(struct bufferevent* bev) {}
virtual void Write(struct bufferevent* bev) {}
virtual void Event(struct bufferevent* bev, short what) {}
void SetCallback(struct bufferevent* bev);
bool Init() { return true; }
protected:
static void ReadCB(bufferevent* bev, void* arg);
static void WriteCB(bufferevent* bev, void* arg);
static void EventCB(struct bufferevent* bev, short what, void* arg);
struct bufferevent* bev = 0; //命令bev
FILE* fp = 0;
};

//XFtpTask.cpp
#include "XFtpTask.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <cstring>
using namespace std;
void XFtpTask::Send(std::string data) {
Send(data.c_str(), data.size());
return;
};
void XFtpTask::Send(const char* data, int datasize) {
if (!bev)
return;
bufferevent_write(bev, data, datasize);
return;
};
void XFtpTask::Close() {
if (bev) { //防止二次释放 内存
bufferevent_free(bev);
bev = 0;
};
if (fp) {
fclose(fp);
fp = 0;
};
return;
};
void XFtpTask::ConnectPORT() { //连接数据通道
if (ip.empty() || port <= 0 || !base) {
cout << "ConnectPORT failed ip or port is null" << endl;
return;
};
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
evutil_inet_pton(AF_INET, ip.c_str(), &sin.sin_addr.s_addr);
SetCallback(bev); //设置回调和权限码
timeval rt = { 60,0 }; //添加超时
bufferevent_set_timeouts(bev, &rt, 0);
bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
return;
};
void XFtpTask::ResCMD(string msg) { //回复cmd消息
if (!cmdTask || !cmdTask->bev)
return;
cout << "ResCMD:" << msg << endl;
if (msg[msg.size() - 1] != '\n')
msg += "\r\n";
bufferevent_write(cmdTask->bev, msg.c_str(), msg.size());
return;
};
void XFtpTask::SetCallback(struct bufferevent* bev) {
bufferevent_setcb(bev, ReadCB, WriteCB, EventCB, this);
bufferevent_enable(bev, EV_READ | EV_WRITE);
return;
};
void XFtpTask::ReadCB(bufferevent* bev, void* arg) {
XFtpTask* t = static_cast<XFtpTask*>(arg);//cpp风格强制转换
t->Read(bev);
return;
};
void XFtpTask::WriteCB(bufferevent* bev, void* arg) {
XFtpTask* t = static_cast<XFtpTask*>(arg);
t->Write(bev);
return;
};
void XFtpTask::EventCB(struct bufferevent* bev, short what, void* arg) {
XFtpTask* t = static_cast<XFtpTask*>(arg);
t->Event(bev, what);
return;
};

XTask类如下:

c++
1
2
3
4
5
6
7
8
9
10
11
12
//XTask.h
#pragma once
class XTask {
public:
struct event_base* base = 0;
int sock;
int thread_id = 0;
virtual bool Init() = 0; //初始化任务
};

//XTask.cpp
#include "XTask.h"

XThread类如下:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//XThread.h
#pragma once
#include <event2/event.h>
#include <list>
#include <mutex>
#include "XTask.h"
class XThread {
public:
void Start(); //启动线程
void Main(); //线程入口函数
bool Setup(); //安装线程 初始化event_base和管道监听事件用于激活线程
void Notify(evutil_socket_t fd, short which); //收到主线程发出的激活消息(线程池的分发)
void Activate(); //线程激活
void AddTask(XTask* t); //添加处理的任务 一个线程同时可以处理多个任务,共用一个event_base
XThread();
~XThread();
int id = 0; //线程编号
private:
int notify_send_fd = 0;
struct event_base* base = 0;
std::list<XTask*> tasks; //任务列表
std::mutex tasks_mutex; //线程安全 互斥
};

//XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#ifdef _WIN32
#else
#include <unistd.h>
#endif
using namespace std;
static void NotifyCallback(evutil_socket_t fd, short which, void* arg) { //激活线程任务的回调函数
XThread* t = (XThread*)arg;
t->Notify(fd, which);
return;
};
void XThread::Notify(evutil_socket_t fd, short which) {
char buf[2] = { 0 }; //水平触发!只要没有接收完全 会再次进来
#ifdef _WIN32
int re = recv(fd, buf, 1, 0);
#else
int re = read(fd, buf, 1);
#endif
if (re <= 0)
return;
cout << id << " thread " << buf << endl;
XTask* task = nullptr;
tasks_mutex.lock(); //获取任务 并初始化任务
if (tasks.empty()) {
tasks_mutex.unlock();
return;
};
task = tasks.front(); //先进先出
tasks.pop_front();
tasks_mutex.unlock();
task->Init();
return;
};
void XThread::AddTask(XTask* t) {
if (!t)
return;
t->base = this->base;
tasks_mutex.lock();
tasks.push_back(t);
tasks_mutex.unlock();
return;
};
void XThread::Activate() {//线程激活
#ifdef _WIN32
int re = send(this->notify_send_fd, "c", 1, 0);
#else
int re = write(this->notify_send_fd, "c", 1);
#endif
if (re <= 0)
cerr << "XThread::Activate() failed!" << endl;
return;
};
void XThread::Start() {
Setup();
thread th(&XThread::Main, this); //启动线程
th.detach(); //断开与主线程联系
return;
};
bool XThread::Setup() { //windows用配对的socket(模拟管道) linux用管道
#ifdef _WIN32
evutil_socket_t fds[2]; //创建一个socketpair 可以互相通信 fds[0] 读; fds[1] 写
if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0) {
cout << "evutil_socketpair failed!" << endl;
return false;
};
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
#else
int fds[2]; //创建的管道 不能用send recv读取 read write
if (pipe(fds)) {
cerr << "pipe failed!" << endl;
return false;
};
#endif
notify_send_fd = fds[1]; //读取绑定到event事件中,写入要保存
event_config* ev_conf = event_config_new(); //创建libevent上下文(无锁)
event_config_set_flag(ev_conf, EVENT_BASE_FLAG_NOLOCK);
this->base = event_base_new_with_config(ev_conf);
event_config_free(ev_conf);
if (!base) {
cerr << "event_base_new_with_config failed in thread!" << endl;
return false;
};
event* ev = event_new(base, fds[0], EV_READ | EV_PERSIST, NotifyCallback, this); //添加管道监听事件 用于激活线程执行任务
event_add(ev, 0);
return true;
};
void XThread::Main() { //线程入口函数
cout << id << " XThread::Main() begin" << endl;
event_base_dispatch(base);
event_base_free(base);
cout << id << " XThread::Main() end" << endl;
return;
};
XThread::XThread() {};
XThread::~XThread() {};

XThreadPool类如下:

c++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//XThreadPool.h
#pragma once
#include<vector>
#include "XTask.h"
class XThread;
class XThreadPool {
public:
static XThreadPool* Get() { //单件模式
static XThreadPool p;
return &p;
};
void Init(int threadCount); //初始化所有线程 并启动线程
void Dispatch(XTask* task); //分发线程
private:
int threadCount = 0; //线程数量
int lastThread = -1;
std::vector<XThread*> threads; //线程池线程
XThreadPool() {}; //小技巧 构造函数私有 避免多次实例化
};

//XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
using namespace std;
void XThreadPool::Dispatch(XTask* task) { //分发线程
if (!task) //轮询
return;
int tid = (lastThread + 1) % threadCount;
lastThread = tid;
XThread* t = threads[tid];
t->AddTask(task);
t->Activate(); //激活线程
return;
};
void XThreadPool::Init(int threadCount) { //初始化所有线程 并启动线程
this->threadCount = threadCount;
this->lastThread = -1;
for (int i = 0; i < threadCount; i++) {
XThread* t = new XThread();
t->id = i + 1;
std::cout << "Create thread " << t->id << endl;
t->Start();
this->threads.push_back(t);
this_thread::sleep_for(10ms);
};
return;
};

Makefile如下:

makefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GCC ?= g++
CCMODE = PROGRAM
INCLUDES = -I/opt/libevent/include/
CFLAGS = -Wall $(MACRO)
TARGET = ftpSrv
SRCS := $(wildcard *.cpp)
LIBS = -L /opt/libevent/lib/ -levent -lpthread
ifeq ($(CCMODE),PROGRAM)
$(TARGET): $(LINKS) $(SRCS)
$(GCC) $(CFLAGS) $(INCLUDES) -o $(TARGET) $(SRCS) $(LIBS)
@chmod +x $(TARGET)
@echo make $(TARGET) ok.
clean:
rm -rf $(TARGET)
endif
clean:
rm -f $(TARGET)
.PHONY:install
.PHONY:clean