C++后端开发入门-环境配置与多线程编程

环境配置

下载并安装Ubuntu虚拟机,网卡改为桥接模式,启用root账户并设置密码。接下来装SSH:

1
sudo apt install openssh-server,net-tools

然后修改/etc/ssh/sshd_config,将下列属性的注释去掉并修改值:

1
2
3
LoginGraceTime 2m
PermitRootLogin yes
StrictModes yes

关防火墙并重启SSH:

1
2
3
4
5
6
sudo ufw disable
sudo ufw status
sudo service ssh restart
sudo systemctl enable ssh
sudo systemctl status ssh
sudo reboot

接下来用MobaXTerm或XShell连上即可。

在Visual Studio 2022中添加“使用C++进行Linux和嵌入式开发”,新建“空项目(Linux)”。选择“工具”->“选项”->“跨平台”->“连接管理器”。添加一个SSH连接并下载远程标头。

随便写个程序,要用开始调试,在“调试”->“Linux控制台”中能看到输出。

或者也可以在Visual Studio Code中下个“Remote - SSH”插件即可。

POSIX多线程

线程创建

pthread_create创建线程,在pthread.h中定义。

1
2
3
4
5
6
int pthread_create(
pthread_t* pid, //创建成功后线程ID
const pthread_attr_t* attr, //指向线程属性结构 NULL则为默认属性
void* (*start_routine)(void*), //指向线程函数地址
void* arg //传给线程函数的参数
); //成功返回0

用阻塞函数pthread_join等待子线程结束,让主线程挂起直到子线程都退出,并让子线程所占资源释放。

1
2
3
4
int pthread_join(
pthread_t pid, //所等待线程ID
void** value_ptr //通常设NULL 否则该函数复制线程退出值到一个内存趋于 让该指针指向该内存区域
); //成功返回0 否则返回错误码

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <pthread.h>
#include <stdio.h>
void* thfunc(void* arg) {
int* pn = (int*)(arg); //获取参数的地址
int n = *pn;
printf("in thfunc:n=%d\n", n);
return (void*)0;
};
int main(int argc, char* argv[]) {
pthread_t tidp;
int ret, n = 110;
ret = pthread_create(&tidp, NULL, thfunc, &n);//创建线程并传递n的地址
if (ret) {
printf("pthread_create failed:%d\n", ret);
return -1;
};
pthread_join(tidp, NULL); //等待子线程结束
printf("in main:thread is created\n");
return 0;
};

还可以传结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <pthread.h>
#include <stdio.h>
typedef struct { //定义结构体的类型
int n;
const char* str;
}MYSTRUCT;
void* thfunc(void* arg) {
MYSTRUCT* p = (MYSTRUCT*)arg;
printf("in thfunc:n=%d,str=%s\n", p->n, p->str); //打印结构体的内容
return (void*)0;
};
int main(int argc, char* argv[]) {
pthread_t tidp;
int ret;
MYSTRUCT mystruct; //定义结构体
mystruct.n = 110; //初始化结构体
mystruct.str = "hello world";
ret = pthread_create(&tidp, NULL, thfunc, (void*)&mystruct);//创建线程并传递结构体地址
if (ret) {
printf("pthread_create failed:%d\n", ret);
return -1;
};
pthread_join(tidp, NULL); //等待子线程结束
printf("in main:thread is created\n");
return 0;
};

线程属性包括:分离状态、调度策略与参数、作用域、栈尺寸、栈地址、优先级等,定义如下:

1
2
3
4
5
typedef unsigned long int pthread_t;
union pthread_attr_t {
char __size[__SIZEOF_PTHREAD_ATTR_T]; //属性值
long int __align;
};

要获取线程属性,需先用pthread_getattr_np来获取属性结构体值,再用相应函数获得某属性具体值。

1
2
3
4
5
6
#define _GNU_SOURCE
#include <pthread.h>
int pthread_getattr_np(
pthread_t thread, //线程ID
pthread_attr_t *attr //返回线程属性结构体内容
); //成功0 否则错误码

获取的属性结构体不再需要时用pthread_attr_destroy销毁。

1
2
3
int pthread_attr_destroy(
pthread_attr_t * attr
);

pthread_create创建线程时,属性结构体指针若用NULL,则创建的线程有默认属性,即非分离、大小为1MB堆栈、与父进程有同样级别的优先级、分时调度策略。若创建非默认属性线程,可创建线程前用pthread_attr_init来初始化一个线程属性结构体并设置相应属性,接着传给pthread_create,注意这里属性结构体用完也要用pthread_attr_destroy释放。

1
2
3
int pthread_attr_init(
pthread_attr_t *attr //指向属性结构体
); //成功0 否则错误码

分离状态决定一个线程以什么样方式终止。一种为分离状态的,用PTHREAD_CREATE_DETACHED表示;另一种为非分离状态的,或称可连接的,用PTHREAD_CREATE_JOINABLE表示。

一个可连接的线程可被其他线程回收资源和取消,且不会主动释放资源(如堆栈空间和线程描述符等,总计8KB多),必须等其他线程来回收其资源,因此要在主线程用阻塞函数pthead_join,当其返回时所等待的线程资源也就被释放了。若父进程不退出且不调用pthread_join,则可连接线程资源一直不释放,变成僵尸线程,僵尸线程越来越多后新线程将无资源可用。若不用pthread_join,父进程先于子线程退出,那么子线程将被init进程收养,变为其父进程,并调用wait系列函数为其回收资源,不会泄露资源。为了避免内存泄露,可连接的线程在终止时,要么设为可分离的,要么用pthread_join回收资源。一个线程不能被多个线程等待,否则第一个收到信号的线程成功返回,其余线程得到错误代码ESRCH。

可分离的线程运行结束后,资源立刻被系统回收。可用pthread_detach将线程转换为可分离线程。也可用pthread_attr_setdetachstate在创建线程时设为为可分离的。可分离的线程没有类似pthread_join的函数,但可在主线程用pthread_exit,此时只终止主线程,进程资源会为由主线程创建的其他线程保持打开的状态,直到其他线程都终止。

1
2
3
4
int pthread_attr_setdetachstate(
pthread_attr_t *attr, //要设置的属性结构体
int detachstate //PTHREAD_CREATE_DETACHED或PTHREAD_CREATE_JOINABLE
); //成功0 否则错误码

例如创建一个可分离线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <pthread.h>
#include <unistd.h> //sleep
using namespace std;
void* thfunc(void* arg) {
cout << ("sub thread is running\n");
return NULL;
};
int main(int argc, char* argv[]) {
pthread_t thread_id;
pthread_attr_t thread_attr;
struct sched_param thread_param;
size_t stack_size;
int res;
res = pthread_attr_init(&thread_attr);
if (res)
cout << "pthread_attr_init failed:" << res << endl;
res = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED);
if (res)
cout << "pthread_attr_setdetachstate failed:" << res << endl;
res = pthread_create(&thread_id, &thread_attr, thfunc, NULL);
if (res)
cout << "pthread_create failed:" << res << endl;
cout << "main thread will exit\n" << endl;
sleep(1);
return 0;
};

还可用pthread_detach将一个可连接的线程转换为可分离的线程。如果一个线程被其他线程连接了,则该函数不会产生作用,且该线程继续处于可连接状态。若一个线程成功进行pthread_detach后,则无法被连接。

1
2
3
int pthread_detach(
pthread_t thread //线程ID
); //成功0 EINVAL目标线程不是可连接的线程 ESRCH该ID线程没找到

pthread_attr_getdetachstate获取分离状态。

1
2
3
4
int pthread_attr_getdetachstate(
pthread_attr_t *attr, //属性结构体指针
int *detachstate //返回分离状态
); //成功0 否则错误码

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
static void* thread_start(void* arg) {
int i, s;
pthread_attr_t gattr;
s = pthread_getattr_np(pthread_self(), &gattr);
if (s != 0)
printf("pthread_getattr_np failed\n");
s = pthread_attr_getdetachstate(&gattr, &i);
if (s)
printf("pthread_attr_getdetachstate failed");
printf("Detach state = %s\n", (i == PTHREAD_CREATE_DETACHED) ? "PTHREAD_CREATE_DETACHED" : (i == PTHREAD_CREATE_JOINABLE) ? "PTHREAD_CREATE_JOINABLE" : "???");
pthread_detach(pthread_self());
s = pthread_getattr_np(pthread_self(), &gattr);
if (s != 0)
printf("pthread_getattr_np failed\n");
s = pthread_attr_getdetachstate(&gattr, &i);
if (s)
printf(" pthread_attr_getdetachstate failed");
printf("after pthread_detach,\nDetach state = %s\n", (i == PTHREAD_CREATE_DETACHED) ? "PTHREAD_CREATE_DETACHED" : (i == PTHREAD_CREATE_JOINABLE) ? "PTHREAD_CREATE_JOINABLE" : "???");
pthread_attr_destroy(&gattr);
return NULL;
};
int main(int argc, char* argv[]) {
pthread_t thread_id;
int s;
s = pthread_create(&thread_id, NULL, &thread_start, NULL);
if (s != 0) {
printf("pthread_create failed\n");
return 0;
};
pthread_exit(NULL);
};

pthread_attr_getstacksize获取栈尺寸:

1
2
3
4
int pthread_attr_getstacksize(
pthread_attr_t *attr, //属性结构体
size_t *stacksize //栈尺寸 单位字节
); //成功0 否则错误码

例如获取默认栈尺寸和最小尺寸:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <limits.h>
static void* thread_start(void* arg) {
int i, res;
size_t stack_size;
pthread_attr_t gattr;
res = pthread_getattr_np(pthread_self(), &gattr);
if (res)
printf("pthread_getattr_np failed\n");
res = pthread_attr_getstacksize(&gattr, &stack_size);
if (res)
printf("pthread_getattr_np failed\n");
printf("Default stack size is %u byte; minimum is %u byte\n", stack_size, PTHREAD_STACK_MIN);
pthread_attr_destroy(&gattr);
return NULL;
};
int main(int argc, char* argv[]){
pthread_t thread_id;
int s;
s = pthread_create(&thread_id, NULL, &thread_start, NULL);
if (s != 0) {
printf("pthread_create failed\n");
return 0;
};
pthread_join(thread_id, NULL);
};

某个线程一定有一种调度策略来调度它,管理多个线程如何占用CPU就是线程调度,不同操作系统的调度方法/策略不同。实时指操作系统对中断的响应实时性非常高,非实时相反。VxWorks属于实时操作系统RTOS,Windows和Linux属于非实时操作系统(或称分时操作系统TSOS)。响应实时的表现是抢占,通过优先级控制,优先级高的任务最先占用CPU。Linux线程有实时和分时之分,调度策略有分时调度策略、先来先服务调度策略、实时的分时调度策略,后两者只用于实时线程。

对于分时调度策略(或称轮转策略,用SCHED_OTHER表示),不支持优先级,为每个线程分配一段运行时间,称为时间片。优先级返回0。

对于先来先服务调度策略(用SCHED_FIFO表示),支持优先级抢占。CPU让一个先来的线程执行完再调度下一个线程,顺序按照创建线程的先后。线程一旦占用CPU则一直运行,直到有更高优先级任务到达或自己放弃CPU。若有和正在运行的线程具有相同优先级的线程已经就绪,则必须等待正在运行的线程主动放弃后才可以运行这个就绪的线程。优先级范围为1~99。

对于实时的分时调度策略(或称时间片轮转/轮询调度策略,用SCHED_RR表示),支持优先级抢占。CPU分配给每个线程一个特定时间片,当线程时间片用完,系统将重新分配时间片,并将线程置于实时线程就绪队列尾部,保证所有具有相同优先级的线程能被公平调度。优先级范围同上。

sched_get_priority_minsched_get_priority_min分别获得某调度策略的最低和最高优先级。

1
2
3
4
5
6
7
#include <sched.h>
int sched_get_priority_max(
int policy //调度策略 SCHED_FIFO SCHED_RR SCHED_OTHER
);
int sched_get_priority_min(
int policy
);

例如获取各调度策略下的最低/高优先级:

1
2
3
4
5
6
7
8
9
#include <stdio.h>
#include <unistd.h>
#include <sched.h>
int main() {
printf("Valid priority range for SCHED_OTHER: %d - %d\n", sched_get_priority_min(SCHED_OTHER), sched_get_priority_max(SCHED_OTHER));
printf("Valid priority range for SCHED_FIFO: %d - %d\n", sched_get_priority_min(SCHED_FIFO), sched_get_priority_max(SCHED_FIFO));
printf("Valid priority range for SCHED_RR: %d - %d\n", sched_get_priority_min(SCHED_RR), sched_get_priority_max(SCHED_RR));
return 0;
};

线程主动退出可用pthread_exit或直接函数返回。

1
2
3
void pthread_exit(
void *retval //返回给主线程的值
);

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#define PTHREAD_NUM 2
void* thrfunc1(void* arg) {
static int count = 1;
pthread_exit((void*)(&count));
};
void* thrfunc2(void* arg) {
static int count = 2;
return (void*)(&count);
};
int main(int argc, char* argv[]) {
pthread_t pid[PTHREAD_NUM];
int retPid;
int* pRet1;
int* pRet2;
if ((retPid = pthread_create(&pid[0], NULL, thrfunc1, NULL)) != 0) {
perror("create pid first failed");
return -1;
};
if ((retPid = pthread_create(&pid[1], NULL, thrfunc2, NULL)) != 0) {
perror("create pid second failed");
return -1;
};
if (pid[0] != 0) {
pthread_join(pid[0], (void**)&pRet1);
printf("get thread 0 exitcode: %d\n", *pRet1);
};
if (pid[1] != 0) {
pthread_join(pid[1], (void**)&pRet2);
printf("get thread 1 exitcode: %d\n", *pRet2);
};
return 0;
};

线程被动退出有两种方法。一种可在同进程的另一个线程中用pthread_kill向要结束的进程发送信号,目标线程收到信号后再退出。此时接收信号的线程必须先用signal注册该信号的处理函数。若线程代码内不对某信号做处理,则按照信号默认形为影响整个进程。例如没有为SIGQUIT信号实现处理函数则默认整个进程退出,所以若sig参数不为0,一定要实现线程信号处理函数,否则影响整个进程。

1
2
3
4
int pthread_kill(
pthread_t thread, //线程ID
int sig //信号 通常大于0 等于0用来探测线程是否存在
); //成功0 ESRCH线程不存在 EINVAL信号不合法

例如向线程发送请求结束信号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <pthread.h>
#include <signal.h>
#include <unistd.h> //sleep
using namespace std;
static void on_signal_term(int sig) {
cout << "sub thread will exit" << endl;
pthread_exit(NULL);
};
void* thfunc(void* arg) {
signal(SIGQUIT, on_signal_term);
int tm = 50;
while (true) {
cout << "thrfunc--left:" << tm << " s--" << endl;
sleep(1);
tm--;
};
return (void*)0;
};
int main(int argc, char* argv[]) {
pthread_t pid;
int res;
res = pthread_create(&pid, NULL, thfunc, NULL);
sleep(5); //让出CPU 5秒 让子线程执行
pthread_kill(pid, SIGQUIT);
pthread_join(pid, NULL);
cout << "sub thread has completed,main thread will exit\n";
return 0;
};

此外还可用pthread_kill探测线程是否还存活:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <pthread.h>
#include <signal.h>
#include <unistd.h> //sleep
#include "errno.h"
using namespace std;
void* thfunc(void* arg) {
int tm = 50;
while (1) {
cout << "thrfunc--left:" << tm << " s--" << endl;
sleep(1);
tm--;
};
return (void*)0;
};
int main(int argc, char* argv[]) {
pthread_t pid;
int res;
res = pthread_create(&pid, NULL, thfunc, NULL);
sleep(5);
int kill_rc = pthread_kill(pid, 0); //仍会退出进程
if (kill_rc == ESRCH)
cout << "the specified thread did not exists or already quit\n";
else if (kill_rc == EINVAL)
cout << "signal is invalid\n";
else
cout << "the specified thread is alive\n";
return 0;
};

另一种在同进程的另一个线程中用pthread_cancel发送取消请求,请求取消目标线程的执行。不过就算发送成功也不一定意味着线程停止运行了,只有被取消的线程下次调用系统函数、C库函数或pthread_testcancel函数等时,才会真正结束线程。这种在线程执行过程中,检测是否有未响应取消信号的地方叫做取消点。被取消线程成功停止运行将返回常数PTHREAD_CANCELED,即-1。

1
2
3
int pthread_cancel(
pthread_t thread //线程ID
); //发送请求成功0 否则错误码

可用pthread_testcancel让内核检测是否需要取消当前线程。

1
void pthread_testcancel(void);

例如发送取消请求并成功取消线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<stdio.h>
#include<stdlib.h>
#include <pthread.h>
#include <unistd.h> //sleep
void* thfunc(void* arg) {
int i = 1;
printf("thread start-------- \n");
while (1) {
i++;
pthread_testcancel();
};
return (void*)0;
};
int main() {
void* ret = NULL;
int iret = 0;
pthread_t tid;
pthread_create(&tid, NULL, thfunc, NULL);
sleep(1);
pthread_cancel(tid);
pthread_join(tid, &ret);
if (ret == PTHREAD_CANCELED)
printf("thread has stopped,and exit code: %d\n", ret);
else
printf("some error occured");
return 0;
};

例如当子线程在被阻塞到accept等函数时被动退出,被锁独占的资源将永远无法被释放。此时可用pthread_cleanup_pushpthread_cleanup_pop让线程退出时做一些清理工作,前者把一个函数压入清理函数栈,后者弹出栈顶清理函数并根据参数决定是否执行清理函数。

pthread_cleanup_push压栈的清理函数在三种情况下执行:线程主动结束;调用pthread_cleanup_pop且参数非零;线程被其他线程取消。

1
2
3
4
void pthread_cleanup_push(
void (*routine)(void*),
void *arg
);

pthread_cleanup_pop的参数execute为0时不执行清理函数,非0时执行。pthread_cleanup_pushpthread_cleanup_pop必须成对出现在同一函数中,即使不执行,否则编译不通过。

1
2
3
void pthread_cleanup_pop(
int execute
);

例如取消线程时引发清理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h> //sleep
void mycleanfunc(void* arg) { //清理函数
printf("mycleanfunc:%d\n", *((int*)arg));
return;
};
void* thfunc(void* arg) {
int i = 1;
printf("thread start-------- \n");
pthread_cleanup_push(mycleanfunc, &i); //把清理函数压栈
while (1) {
i++;
printf("i=%d\n", i);
pthread_testcancel();
};
printf("this line will not run\n");
pthread_cleanup_pop(0);
return (void*)0;
};
int main(void) {
void* ret = NULL;
int iret = 0;
pthread_t tid;
pthread_create(&tid, NULL, thfunc, NULL); //创建线程
sleep(1);
pthread_cancel(tid); //发送取消线程的请求
pthread_join(tid, &ret); //等待线程结束
if (ret == PTHREAD_CANCELED) //判断是否成功取消线程
printf("thread has stopped,and exit code: %d\n", ret); //打印下返回值,应该是-1
else
printf("some error occured");
return 0;
};

C++11线程类

默认构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <stdlib.h>
#include <chrono> // std::chrono::seconds
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::sleep_for
void thfunc(int n) {
std::cout << "thfunc:" << n << std::endl;
return;
};
int main(int argc, const char* argv[]) {
std::thread threads[5];
std::cout << "create 5 threads...\n";
for (int i = 0; i < 5; i++)
threads[i] = std::thread(thfunc, i + 1);
for (auto& t : threads)
t.join();
std::cout << "All threads joined.\n";
return EXIT_SUCCESS;
};

用初始化构造函数传结构体参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>
using namespace std;
typedef struct { //定义结构体的类型
int n;
const char* str;
}MYSTRUCT;
void thfunc(void* arg) { //线程函数
MYSTRUCT* p = (MYSTRUCT*)arg;
cout << "in thfunc:n=" << p->n << ",str=" << p->str << endl; //打印结构体的内容
return;
};
int main(int argc, char* argv[]) {
MYSTRUCT mystruct; //定义结构体
mystruct.n = 110; //初始化结构体
mystruct.str = "hello world";
thread t(thfunc, &mystruct); //定义线程对象t,并把线程函数指针和线程函数参数传入
t.join(); //等待线程对象t结束
return 0;
};

C++11创建的线程也是可连接的线程,这里可用detach进行分离,C++11线程类可以和POSIX结合使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <thread>
using namespace std;
void thfunc(int n, int m, int* k, char s[]) { //线程函数
cout << "in thfunc:n=" << n << ",m=" << m << ",k=" << *k << "\nstr=" << s << endl;
*k = 5000;
return;
};
int main(int argc, char* argv[]) {
int n = 110, m = 200, k = 5;
char str[] = "hello world";
thread t(thfunc, n, m, &k, str); //定义线程对象
t.detach(); //分离线程
cout << "k=" << k << endl; //这里输出3
pthread_exit(NULL); //main线程结束,但进程并不会结束,下面一句不会执行
cout << "this line will not run" << endl; //这一句不会执行
return 0;
};

移动构造函数如下。其中move调用成功后,参数不代表任何thread对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>
using namespace std;
void fun(int& n) { //线程函数
cout << "fun: " << n << "\n";
n += 20;
this_thread::sleep_for(chrono::milliseconds(10)); //等待10毫秒
return;
};
int main(void) {
int n = 0;
cout << "n=" << n << '\n';
n = 10;
thread t1(fun, ref(n)); //ref(n)是取n的引用
thread t2(move(t1)); //t2执行fun t1不是thread对象 不执行
t2.join(); //等待t2执行完毕
cout << "n=" << n << '\n';
return 0;
};

例如获取线程ID:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>       // std::cout
#include <thread> // std::thread, std::thread::id, std::this_thread::get_id
using namespace std;
thread::id main_thread_id = this_thread::get_id(); //获取主线程id
void is_main_thread() {
if (main_thread_id == this_thread::get_id()) //判断是否和主线程id相同
std::cout << "This is the main thread.\n";
else
std::cout << "This is not the main thread.\n";
return;
};
int main() {
is_main_thread(); // is_main_thread作为main线程的普通函数调用
thread th(is_main_thread); // is_main_thread作为线程函数使用
th.join(); //等待th结束
return 0;
};

this_thread命名空间引用当前线程,其中调用yield函数的线程放弃执行并让出自己的CPU时间片,回到就绪态,使其他线程有机会运行。例如线程赛跑排名次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>       // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <atomic> // std::atomic
using namespace std;
atomic<bool> ready(false);
void thfunc(int id) {
while (!ready) //一直等待,知道main线程中重置全局变量ready
this_thread::yield();
for (volatile int i = 0; i < 1000000; ++i);
cout << id << ",";//输出的是排名,先完成先打印
return;
};
int main() {
thread threads[10];
cout << "race of 10 threads that count to 1 million:\n";
for (int i = 0; i < 10; ++i)
threads[i] = thread(thfunc, i);
ready = true;
for (auto& th : threads) th.join();
return 0;
};

可用sleep_untilsleep_for阻塞线程,暂停执行一段时间。sleep_until表示函数阻塞线程到sleep_time时间点,到了该时间点后再继续执行。

1
2
template< class Clock, class Duration >
void sleep_until(const std::chrono::time_point<Clock, Duration>& sleep_time);

例如挂起线程到下一个整分时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>       // std::cout
#include <thread> // std::this_thread::sleep_until
#include <chrono> // std::chrono::system_clock
#include <ctime> // std::time_t, std::tm, std::localtime, std::mktime
#include <time.h>
#include <stddef.h>
using namespace std;
using std::chrono::system_clock;
void getNowTime() {
timespec time;
struct tm nowTime;
clock_gettime(CLOCK_REALTIME, &time); //获取相对于1970到现在的秒数
localtime_r(&time.tv_sec, &nowTime);
char current[1024];
printf("%04d-%02d-%02d %02d:%02d:%02d\n", nowTime.tm_year + 1900, nowTime.tm_mon + 1, nowTime.tm_mday, nowTime.tm_hour, nowTime.tm_min, nowTime.tm_sec);
return;
};
int main() {
std::time_t tt = system_clock::to_time_t(system_clock::now());
struct std::tm* ptm = std::localtime(&tt);
getNowTime();
cout << "Waiting for the next minute to begin...\n";
++ptm->tm_min; //加一分钟
ptm->tm_sec = 0;
this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));
getNowTime();
return 0;
};

sleep_for挂起线程一段时间sleep_duration,这段时间内暂停执行。

1
2
template< class Rep, class Period >
void sleep_for(const std::chrono::duration<Rep, Period>& sleep_duration);

例如暂停线程5秒:

1
2
3
4
5
6
7
8
9
10
11
12
#include <iostream>       // std::cout, std::endl
#include <thread> // std::this_thread::sleep_for
#include <chrono> // std::chrono::seconds
int main() {
std::cout << "countdown:\n";
for (int i = 5; i > 0; --i) {
std::cout << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
};
std::cout << "Lift off!\n";
return 0;
};

线程同步

一次仅允许一个线程使用的共享资源叫做临界资源,个线程应互斥对其访问。每个线程中访问临界资源的代码称为临界区(或称临界段),每次只准许一个线程进入临界区。

开始时初始化一个互斥锁,进入临界区前把互斥锁加锁,退出临界区时把互斥锁解锁,最后不用互斥锁时将它销毁。

POSIX中用pthread_mutex_init初始化互斥锁,称为动态方式,需要销毁。其中restrict关键字只用于限定指针,告知编译器所有修改该指针所指向内容的操作全都基于该指针,不存在其他修改操作的途径,帮助编译器更好的代码优化。

1
2
3
4
int pthread_mutex_init(
pthread_mutex_t* restrict mutex, //互斥锁
const pthread_mutexattr_t* restrict attr //互斥锁属性 默认NULL
); //成功0 否则错误码

使用方法:

1
2
3
4
5
6
//法一
pthread_mutex_t* pmutex = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(pmutex, NULL);
//法二
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);

还可以用PTHREAD_MUTEX_INITIALIZER宏初始化互斥锁,称为常量初始化方式,不需要销毁。使用方法:

1
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //不能用malloc

为互斥锁上锁可用pthread_mutex_lockpthread_mutex_trylock。用pthread_mutex_lock时若互斥锁已被其他线程上锁,则调用该函数的线程将被阻塞,即让出CPU,避免忙等现象,必须与pthread_mutex_unlock成对使用。

1
2
3
int pthread_mutex_lock(
pthread_mutex_t *mutex //初始化后的互斥锁
); //成功0 否则错误码

pthread_mutex_trylock时若互斥锁已被其他线程上锁,则不阻塞,立即返回EBUSY。

1
2
3
int pthread_mutex_trylock( //同上
pthread_mutex_t *mutex
);

pthread_mutex_unlock对已上锁互斥锁解锁。

1
2
3
int pthread_mutex_unlock( //同上
pthread_mutex_t *mutex
);

pthread_mutex_destroy销毁互斥锁。

1
2
3
int pthread_mutex_destroy( //同上
pthread_mutex_t *mutex
);

例如多线程累加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <string.h>
#include <cstdlib>
int gcn = 0;
pthread_mutex_t mutex;
void* thread_1(void* arg) {
int j;
for (j = 0; j < 10000000; j++) {
pthread_mutex_lock(&mutex);
gcn++;
pthread_mutex_unlock(&mutex);
};
pthread_exit((void*)0);
return NULL;
};
void* thread_2(void* arg) {
int j;
for (j = 0; j < 10000000; j++) {
pthread_mutex_lock(&mutex);
gcn++;
pthread_mutex_unlock(&mutex); //解锁
};
pthread_exit((void*)0);
return NULL;
};
int main(void) {
int j, err;
pthread_t th1, th2;
pthread_mutex_init(&mutex, NULL); //初始化互斥锁
for (j = 0; j < 10; j++) {
err = pthread_create(&th1, NULL, thread_1, (void*)0);
if (err != 0) {
printf("create new thread error:%s\n", strerror(err));
exit(0);
};
err = pthread_create(&th2, NULL, thread_2, (void*)0);
if (err != 0) {
printf("create new thread error:%s\n", strerror(err));
exit(0);
};
err = pthread_join(th1, NULL);
if (err != 0) {
printf("wait thread done error:%s\n", strerror(err));
exit(1);
};
err = pthread_join(th2, NULL);
if (err != 0) {
printf("wait thread done error:%s\n", strerror(err));
exit(1);
};
printf("gcn=%d\n", gcn);
gcn = 0;
};
pthread_mutex_destroy(&mutex); //销毁互斥锁
return 0;
};

在读写操作问题中,对资源的访问有两种情况。一种访问方式是独占的,称为写操作;另一种访问方式是可以共享的,可有多个线程同时去访问某资源,称为读操作。

读写锁是多线程同步的另一种机制。若一个线程用读锁锁住了临界区,其他线程也可用读锁来进入临界区。但若这时再进行写锁加锁就会发生阻塞,后面若继续有读锁请求,则也都被阻塞。若一个线程用写锁锁住临界区,则其他线程无论读锁还是写锁都会发生阻塞。

读写锁比互斥锁具有更高的适用性和并行性,但读写锁系统开销更大。

读写锁用pthread_rwlock_init初始化,过程与互斥锁同理:

1
2
3
4
5
6
7
8
9
10
11
12
13
int pthread_rwlock_init(
pthread_rwlock_t* restrict rwlock,
const pthread_rwlockattr_t* restrict attr
);

//法一
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
//法二
pthread_rwlock_t* prwlock = (pthread_rwlock_t*)malloc(sizeof(pthread_rwlock_t));
pthread_rwlock_init(prwlock, NULL);
//法三
pthread_rwlock_t rwlock;
pthread_rwlock_init(&rwlock, NULL);

读写锁的上锁分为读模式和写模式的上锁,上锁、解锁与销毁函数的用法与互斥锁同理:

1
2
3
4
5
6
7
8
9
10
int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock);

//以下上锁函数可设定在规定时间内等待读写锁 等不到就返回ETIMEDOUT
int pthread_rwlock_timedrdlock(pthread_rwlock_t* restrict rwlock, const struct timespec* restrict abstime);
int pthread_rwlock_timedwrlock(pthread_rwlock_t* restrict rwlock, const struct timespec* restrict abstime);

条件变量是一种同步机制,用于让一个线程因等待“条件变量的条件”而挂起,另一个线程在条件成立后向挂起的线程发送条件成立的信号。为了防止多个线程同时请求pthread_cond_wait而新城竞争条件,条件变量必须和一个互斥锁联合使用。

生产者消费者问题(或称有界缓冲区问题)是一个经典问题。两个线程共享一个公共的固定大小的缓冲区,其中一个是生产者,用于将数据放入缓冲区;另一个是消费者,用于从缓冲区取出数据。若当缓冲区已满,而生产者还想向其中放入一个新的数据项,应该让生产者休眠,等消费者从缓冲区取走一个或多个数据后再唤醒它。若当缓冲区已空,而消费者还想取数据,应该让消费者休眠,等生产者放入一个或多个数据时再唤醒它。

条件变量用pthread_cond_init初始化,过程与互斥锁同理:

1
2
3
4
5
6
7
8
9
10
11
12
13
int pthread_cond_init(
pthread_cond_t* cv,
const pthread_condattr_t* cattr
);

//法一
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
//法二
pthread_cond_t* pcond = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
pthread_cond_init(pcond, NULL);
//法三
pthread_cond_t cond;
pthread_cond_init(&cond, NULL);

pthread_cond_waitpthread_cond_timedwait等待条件变量,并将线程阻塞在一个条件变量上。当条件不满足,pthread_cond_wait先将互斥锁解锁,再被条件变量阻塞,这是原子操作,不被打断。被阻塞的线程可以后通过其他线程唤醒。若唤醒后条件依然不满足,则线程将继续阻塞在这里,等被下一次唤醒。当函数等到条件变量时,先对互斥锁上锁,再唤醒本线程,这也是原子操作。只有当信号到来时互斥锁已解锁,该函数才会返回。

1
2
3
4
int pthread_cond_wait(
pthread_cond_t* restrict cond, //条件变量
pthread_mutex_t* restrict mutex //互斥锁
); //成功0 否则错误码

阻塞在条件变量上的线程被唤醒可能不是因为条件满足,而是因为虚假唤醒,所以pthread_cond_wait返回只代表共享数据可能被改变,必须要重新判断。

pthread_cond_timedwait是计时等待条件变量。当等待的时间超过timespec,则返回ETIME。timespec中的秒和纳秒数是从1970年1月1日00:00:00开始即使,到现在经历的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* POSIX.1b structure for a time value.  This is like a `struct timeval' but has nanoseconds instead of microseconds.  */
struct timespec{
#ifdef __USE_TIME_BITS64
__time64_t tv_sec; /* Seconds. */
#else
__time_t tv_sec; /* Seconds. */
#endif
#if __WORDSIZE == 64 || (defined __SYSCALL_WORDSIZE && __SYSCALL_WORDSIZE == 64) || (__TIMESIZE == 32 && !defined __USE_TIME_BITS64)
__syscall_slong_t tv_nsec; /* Nanoseconds. */
#else
# if __BYTE_ORDER == __BIG_ENDIAN
int : 32; /* Padding. */
long int tv_nsec; /* Nanoseconds. */
# else
long int tv_nsec; /* Nanoseconds. */
int : 32; /* Padding. */
# endif
#endif
};
int pthread_cond_timedwait(
pthread_cond_t* restrict cond,
pthread_mutex_t* restrict mutex,
const struct timespec* restrict abstime //等待时间
); //成功0 否则错误码

pthread_cond_signal唤醒一个等待条件变量的线程,用pthread_cond_broadcast唤醒所有等待该条件变量的线程。

1
2
3
4
5
6
int pthread_cond_broadcast(
pthread_cond_t* cond //条件变量
); //成功0 否则错误码
int pthread_cond_signal(
pthread_cond_t* cond
);

pthread_cond_destroy销毁条件变量:

1
2
3
int pthread_cond_destroy(
pthread_cond_t *cond
);

例如找出1到20中能整除3的整数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int i = 1;
void* thread1(void*);
void* thread2(void*);
int main(void) {
pthread_t t_a, t_b;
pthread_create(&t_a, NULL, thread2, NULL);
pthread_create(&t_b, NULL, thread1, NULL);
pthread_join(t_b, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
return 0;
};
void* thread1(void* junk) {
for (int current = 1; current <= 20; current++) {
pthread_mutex_lock(&mutex); //锁住互斥量
i = current;
if (i % 3 == 0)
pthread_cond_signal(&cond); //唤醒等待条件变量cond的线程 发送信号
else
printf("thead1:%d\n", i); //打印不能整除
pthread_mutex_unlock(&mutex); //解锁互斥量
sleep(1);
};
i++; //循环结束后,发送最后一次信号唤醒 thread2 退出
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return NULL;
};
void* thread2(void* junk) {
pthread_mutex_lock(&mutex);
while (1) {
while (i % 3 != 0 && i <= 20) //循环检查条件,避免虚假唤醒
pthread_cond_wait(&cond, &mutex); //等待条件变量
if (i > 20)
break; //确保退出条件
printf("------------thread2:%d\n", i); //打印能整除3的i
i++; //处理后将 i 递增 避免重复触发
pthread_mutex_unlock(&mutex);
sleep(1);
pthread_mutex_lock(&mutex);
};
pthread_mutex_unlock(&mutex);
return NULL;
};

C++线程同步

使用基本互斥锁实现多线程统计计数器到100000:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
volatile int counter(0); // non-atomic counter
std::mutex mtx; // locks access to counter
void thrfunc(void) {
for (int i = 0; i < 10000; ++i) {
mtx.lock();
++counter;
mtx.unlock();
};
return;
};
int main(int argc, const char* argv[]) {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(thrfunc);
for (auto& th : threads) th.join();
std::cout << "count to " << counter << " successfully \n";
return 0;
};

其中volatile关键字表示该变量可能被编译器未知的因素修改,例如操作系统、硬件或其他线程等,暗示编译器对访问该变量的代码不再优化。在这多线程情境下,当两个线程可能改变某个变量,需要用volatile声明,防止优化编译器把变量从内存装入寄存器中。

定时互斥锁在基本互斥锁的基础上多了个定时功能。其中try_lock尝试锁住互斥锁,若当前互斥锁被其他线程锁住,则返回false且不会被阻塞掉。上面例子用非阻塞上锁版本重写为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
volatile int counter(0); //定义一个全局变量,当作计数器用于累加
std::mutex mtx; // 用于保护counter的互斥锁
void thrfunc(void) {
for (int i = 0; i < 10000; ++i)
if (mtx.try_lock()) {// 互斥锁上锁
++counter; //计数器累加
mtx.unlock(); //互斥锁解锁
}
else{
std::cout << "try_lock false\n";
i--;
};
return;
};
int main(int argc, const char* argv[]) {
std::thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(thrfunc); //启动10个线程
for (auto& th : threads) th.join(); //等待10个线程结束
std::cout << "count to " << counter << " successfully \n";
return 0;
};

线程池

线程池就是有一堆创建好的进程,初始处于空闲等待状态,当有新任务需处理时,从这堆线程里取一个空闲线程来处理该任务,任务处理完毕后再把线程放回池中,供后面任务继续使用。当池中线程全部处于忙碌状态,没有可用空闲等待进程,此时需选择创建一个新线程并置入池中,或通知任务当前线程池所有线程都在忙,需等待片刻重试。

线程池好处在于线程复用,某个线程处理完一个任务后,可继续处理下一个任务,不用销毁后再创建,可避免无谓的开销,适用于连续产生大量并发任务的场合。

thread_pool.h内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
class CTask { /*执行任务的类:设置任务数据并执行*/
protected:
string m_strTaskName; //任务的名称
void* m_ptrData; //要执行的任务的具体数据
public:
CTask() = default;
CTask(string& taskName) : m_strTaskName(taskName), m_ptrData(NULL) {};
virtual int Run() = 0;
void setData(void* data); //设置任务数据
virtual ~CTask() {};
};
class CThreadPool { /*线程池管理类*/
private:
static vector<CTask*> m_vecTaskList; //任务列表
static bool shutdown; //线程退出标志
int m_iThreadNum; //线程池中启动的线程数
pthread_t* pthread_id;
static pthread_mutex_t m_pthreadMutex; //线程同步锁
static pthread_cond_t m_pthreadCond; //线程同步条件变量
protected:
static void* ThreadFunc(void* threadData); //新线程的线程回调函数
static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入空闲线程中
static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去
int Create(); //创建线程池中的线程
public:
CThreadPool(int threadNum);
int AddTask(CTask* task); //把任务添加到任务队列中
int StopAll(); //使线程池中的所有线程退出
int getTaskSize(); //获取当前任务队列中的任务数
};
void CTask::setData(void* data) {
m_ptrData = data;
return;
};
vector<CTask*> CThreadPool::m_vecTaskList; //静态成员初始化
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
CThreadPool::CThreadPool(int threadNum) { //线程管理类构造函数
this->m_iThreadNum = threadNum;
printf("I will create %d threads.\n", threadNum);
Create();
return;
};
void* CThreadPool::ThreadFunc(void* threadData) { //线程回调函数
pthread_t tid = pthread_self();
while (1) {
pthread_mutex_lock(&m_pthreadMutex);
while (m_vecTaskList.size() == 0 && !shutdown) //如果队列为空,等待新任务进入任务队列
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
if (shutdown) { //关闭线程
pthread_mutex_unlock(&m_pthreadMutex);
printf("[tid: %lu]\texit\n", pthread_self());
pthread_exit(NULL);
};
printf("[tid: %lu]\trun: ", tid);
vector<CTask*>::iterator iter = m_vecTaskList.begin();
CTask* task = *iter; //取出一个任务并处理之
if (iter != m_vecTaskList.end()) {
task = *iter;
m_vecTaskList.erase(iter);
};
pthread_mutex_unlock(&m_pthreadMutex);
task->Run(); //执行任务
printf("[tid: %lu]\tidle\n", tid);
};
return (void*)0;
};
int CThreadPool::AddTask(CTask* task) { //往任务队列里添加任务并发出线程同步信号
pthread_mutex_lock(&m_pthreadMutex);
m_vecTaskList.push_back(task);
pthread_mutex_unlock(&m_pthreadMutex);
pthread_cond_signal(&m_pthreadCond);
return 0;
};
int CThreadPool::Create() { //创建线程
pthread_id = new pthread_t[m_iThreadNum];
for (int i = 0; i < m_iThreadNum; i++)
pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
return 0;
};
int CThreadPool::StopAll() { //停止所有线程
if (shutdown) //避免重复调用
return -1;
printf("Now I will end all threads!\n\n");
shutdown = true; //唤醒所有等待进程,线程池也要销毁了
pthread_cond_broadcast(&m_pthreadCond);
for (int i = 0; i < m_iThreadNum; i++) //清除僵尸
pthread_join(pthread_id[i], NULL);
delete[] pthread_id;
pthread_id = NULL;
pthread_mutex_destroy(&m_pthreadMutex); //销毁互斥量和条件变量
pthread_cond_destroy(&m_pthreadCond);
return 0;
};
int CThreadPool::getTaskSize() {//获取当前队列中的任务数
return m_vecTaskList.size();
};
#endif

test.cpp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex
#include "thread_pool.h"
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
volatile int counter(0); //定义一个全局变量,当作计数器用于累加
std::mutex mtx; // 用于保护counter的互斥锁
class CMyTask :public CTask {
public:
CMyTask() = default;
int Run() {
printf("%s\n", (char*)m_ptrData);
int x = rand() % 4 + 1;
sleep(x);
return 0;
};
~CMyTask() {};
};
int main(void) {
CMyTask taskObj;
char szTmp[] = "hello!";
taskObj.setData((void*)szTmp);
CThreadPool threadpool(5);
for (int i = 0; i < 10; i++)
threadpool.AddTask(&taskObj);
while (1) {
printf("There are still %d tasks need to handle\n", threadpool.getTaskSize());
if (threadpool.getTaskSize() == 0)
if (threadpool.StopAll() == -1) {
printf("Thread pool clear, exit.\n");
exit(0);
};
sleep(2);
printf("2 seconds later..\n");
};
return 0;
};

接下来用C++11实现线程池。创建XThread类代表一个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//XThread.h
#pragma once
#include <list>
#include <mutex>
class XTask;
struct event_base;
class XThread {
public:
void Start();// 启动线程
void Main();// 线程入口函数
void Notify();// 收到主线程发出的激活消息(线程池任务分发)
void Activate(int arg);// 线程激活
void AddTack(XTask*);// 添加任务, 一个线程可以同时处理多个任务,共用一个event_base
XThread();
~XThread();
int id = 0;// 线程编号
private:
event_base* base = 0;
std::list<XTask*> tasks;
std::mutex tasks_mutex;
};

//XThread.cpp
#include <thread>
#include <iostream>
using namespace std;
#include <unistd.h>
#include "testUtil.h"
#include "XThread.h"
#include "XTask.h"
void XThread::Start(void) {
testout(id << " thread At Start()");
thread th(&XThread::Main, this);
th.detach();
return;
};
void XThread::Main(void) {
cout << id << " thread::Main() begin" << endl;
cout << id << " thread::Main() end" << endl;
return;
};
void XThread::Activate(int arg) {
testout(id << " thread At Activate()");
XTask* t = NULL;
tasks_mutex.lock();
if (tasks.empty()) {
tasks_mutex.unlock();
return;
};
t = tasks.front();
tasks.pop_front();
tasks_mutex.unlock();
t->Init(arg);
return;
};
void XThread::AddTack(XTask* t) {
if (!t)
return;
t->base = this->base;
tasks_mutex.lock();
tasks.push_back(t);
tasks_mutex.unlock();
return;
};
XThread::XThread() {};
XThread::~XThread() {};

然后实现XThreadPool线程池类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//XThreadPool.h
#pragma once
#include <vector>
class XThread;
class XTask;
class XThreadPool{
public:
static XThreadPool* Get(void) { // 单例模式
static XThreadPool p;
return &p;
};
void Init(int threadCount); // 初始化所有线程
void Dispatch(XTask*, int arg); // 分发线程
private:
int threadCount;
int lastThread = -1;
std::vector<XThread*> threads;//所有线程的向量
XThreadPool(void) {};
};

//XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
using namespace std;
#include "testUtil.h"
void XThreadPool::Dispatch(XTask* task, int arg) { //分配任务到线程池
testout("main thread At XThreadPoll::dispathch()");
if (!task)
return;
int tid = (lastThread + 1) % threadCount;
lastThread = tid;
XThread* t = threads[tid]; //得到最新线程的指针
t->AddTack(task); // 添加任务
t->Activate(arg); // 激活线程
return;
};
void XThreadPool::Init(int threadCount) { //初始化线程池
testout("main thread At XThreadPoll::Init()");
this->threadCount = threadCount;
this->lastThread = -1;
for (int i = 0; i < threadCount; i++) {
cout << "Create thread" << i << endl;
XThread* t = new XThread();
t->id = i;
t->Start();
threads.push_back(t);
this_thread::sleep_for(chrono::milliseconds(10));
};
return;
};

然后实现XTask任务类:

1
2
3
4
5
6
7
8
//XTask.h
#pragma once
class XTask{
public:
struct event_base* base = 0; // 一客户端一个base
int thread_id = 0; // 线程池id
virtual bool Init(int arg) = 0;// 初始化任务 具体如何初始化,就要根据具体的任务而重载
};

然后实现每个自定义任务类的具体内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//mytask.h
#include "XTask.h"
#include <string>
using namespace std;
class CMyTask :public XTask {
public:
bool Init(int arg);
};

//mytask.cpp
#include "mytask.h"
bool CMyTask::Init(int arg) {
long long i = 0, c = 0;
while (c < 10000000) {
while (i < 1000000000)
i++;
c++;
};
printf("%d---------%lld--------\n", arg, c);
return false;
};

运行主程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//编译:g++ -Wall -o threadPool main.cpp mytask.cpp XThread.cpp XThreadPool.cpp -lpthread 
//main.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include "mytask.h"
#define XThreadPoolGet XThreadPool::Get()
int main(void) {
int i;
XThreadPoolGet->Init(10);//初始化线程池
CMyTask task[10];
for (i = 0; i < 10; i++)
XThreadPoolGet->Dispatch(&task[i], i);//分配任务
return 0;
};