C++后端开发入门-环境配置与多线程编程 环境配置 下载并安装Ubuntu虚拟机,网卡改为桥接模式,启用root账户并设置密码。接下来装SSH:
1 sudo apt install openssh-server,net-tools
然后修改/etc/ssh/sshd_config,将下列属性的注释去掉并修改值:
1 2 3 LoginGraceTime 2m PermitRootLogin yes StrictModes yes
关防火墙并重启SSH:
1 2 3 4 5 6 sudo ufw disable sudo ufw status sudo service ssh restart sudo systemctl enable ssh sudo systemctl status ssh sudo reboot
接下来用MobaXTerm或XShell连上即可。
在Visual Studio 2022中添加“使用C++进行Linux和嵌入式开发”,新建“空项目(Linux)”。选择“工具”->“选项”->“跨平台”->“连接管理器”。添加一个SSH连接并下载远程标头。
随便写个程序,要用开始调试,在“调试”->“Linux控制台”中能看到输出。
或者也可以在Visual Studio Code中下个“Remote - SSH”插件即可。
POSIX多线程 线程创建 用pthread_create创建线程,在pthread.h中定义。
1 2 3 4 5 6 int pthread_create ( pthread_t * pid, const pthread_attr_t * attr, void * (*start_routine)(void *), void * arg ) ;
用阻塞函数pthread_join等待子线程结束,让主线程挂起直到子线程都退出,并让子线程所占资源释放。
1 2 3 4 int pthread_join ( pthread_t pid, void ** value_ptr ) ;
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <pthread.h> #include <stdio.h> void * thfunc (void * arg) { int * pn = (int *)(arg); int n = *pn; printf ("in thfunc:n=%d\n" , n); return (void *)0 ; }; int main (int argc, char * argv[]) { pthread_t tidp; int ret, n = 110 ; ret = pthread_create (&tidp, NULL , thfunc, &n); if (ret) { printf ("pthread_create failed:%d\n" , ret); return -1 ; }; pthread_join (tidp, NULL ); printf ("in main:thread is created\n" ); return 0 ; };
还可以传结构体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <pthread.h> #include <stdio.h> typedef struct { int n; const char * str; }MYSTRUCT; void * thfunc (void * arg) { MYSTRUCT* p = (MYSTRUCT*)arg; printf ("in thfunc:n=%d,str=%s\n" , p->n, p->str); return (void *)0 ; }; int main (int argc, char * argv[]) { pthread_t tidp; int ret; MYSTRUCT mystruct; mystruct.n = 110 ; mystruct.str = "hello world" ; ret = pthread_create (&tidp, NULL , thfunc, (void *)&mystruct); if (ret) { printf ("pthread_create failed:%d\n" , ret); return -1 ; }; pthread_join (tidp, NULL ); printf ("in main:thread is created\n" ); return 0 ; };
线程属性包括:分离状态、调度策略与参数、作用域、栈尺寸、栈地址、优先级等,定义如下:
1 2 3 4 5 typedef unsigned long int pthread_t ;union pthread_attr_t { char __size[__SIZEOF_PTHREAD_ATTR_T]; long int __align; };
要获取线程属性,需先用pthread_getattr_np来获取属性结构体值,再用相应函数获得某属性具体值。
1 2 3 4 5 6 #define _GNU_SOURCE #include <pthread.h> int pthread_getattr_np ( pthread_t thread, pthread_attr_t *attr ) ;
获取的属性结构体不再需要时用pthread_attr_destroy销毁。
1 2 3 int pthread_attr_destroy ( pthread_attr_t * attr ) ;
用pthread_create创建线程时,属性结构体指针若用NULL,则创建的线程有默认属性,即非分离、大小为1MB堆栈、与父进程有同样级别的优先级、分时调度策略。若创建非默认属性线程,可创建线程前用pthread_attr_init来初始化一个线程属性结构体并设置相应属性,接着传给pthread_create,注意这里属性结构体用完也要用pthread_attr_destroy释放。
1 2 3 int pthread_attr_init ( pthread_attr_t *attr ) ;
分离状态决定一个线程以什么样方式终止。一种为分离状态的,用PTHREAD_CREATE_DETACHED表示;另一种为非分离状态的,或称可连接的,用PTHREAD_CREATE_JOINABLE表示。
一个可连接的线程可被其他线程回收资源和取消,且不会主动释放资源(如堆栈空间和线程描述符等,总计8KB多),必须等其他线程来回收其资源,因此要在主线程用阻塞函数pthead_join,当其返回时所等待的线程资源也就被释放了。若父进程不退出且不调用pthread_join,则可连接线程资源一直不释放,变成僵尸线程,僵尸线程越来越多后新线程将无资源可用。若不用pthread_join,父进程先于子线程退出,那么子线程将被init进程收养,变为其父进程,并调用wait系列函数为其回收资源,不会泄露资源。为了避免内存泄露,可连接的线程在终止时,要么设为可分离的,要么用pthread_join回收资源。一个线程不能被多个线程等待,否则第一个收到信号的线程成功返回,其余线程得到错误代码ESRCH。
可分离的线程运行结束后,资源立刻被系统回收。可用pthread_detach将线程转换为可分离线程。也可用pthread_attr_setdetachstate在创建线程时设为为可分离的。可分离的线程没有类似pthread_join的函数,但可在主线程用pthread_exit,此时只终止主线程,进程资源会为由主线程创建的其他线程保持打开的状态,直到其他线程都终止。
1 2 3 4 int pthread_attr_setdetachstate ( pthread_attr_t *attr, int detachstate ) ;
例如创建一个可分离线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <pthread.h> #include <unistd.h> using namespace std;void * thfunc (void * arg) { cout << ("sub thread is running\n" ); return NULL ; }; int main (int argc, char * argv[]) { pthread_t thread_id; pthread_attr_t thread_attr; struct sched_param thread_param; size_t stack_size; int res; res = pthread_attr_init (&thread_attr); if (res) cout << "pthread_attr_init failed:" << res << endl; res = pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED); if (res) cout << "pthread_attr_setdetachstate failed:" << res << endl; res = pthread_create (&thread_id, &thread_attr, thfunc, NULL ); if (res) cout << "pthread_create failed:" << res << endl; cout << "main thread will exit\n" << endl; sleep (1 ); return 0 ; };
还可用pthread_detach将一个可连接的线程转换为可分离的线程。如果一个线程被其他线程连接了,则该函数不会产生作用,且该线程继续处于可连接状态。若一个线程成功进行pthread_detach后,则无法被连接。
1 2 3 int pthread_detach ( pthread_t thread ) ;
用pthread_attr_getdetachstate获取分离状态。
1 2 3 4 int pthread_attr_getdetachstate ( pthread_attr_t *attr, int *detachstate ) ;
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> static void * thread_start (void * arg) { int i, s; pthread_attr_t gattr; s = pthread_getattr_np (pthread_self (), &gattr); if (s != 0 ) printf ("pthread_getattr_np failed\n" ); s = pthread_attr_getdetachstate (&gattr, &i); if (s) printf ("pthread_attr_getdetachstate failed" ); printf ("Detach state = %s\n" , (i == PTHREAD_CREATE_DETACHED) ? "PTHREAD_CREATE_DETACHED" : (i == PTHREAD_CREATE_JOINABLE) ? "PTHREAD_CREATE_JOINABLE" : "???" ); pthread_detach (pthread_self ()); s = pthread_getattr_np (pthread_self (), &gattr); if (s != 0 ) printf ("pthread_getattr_np failed\n" ); s = pthread_attr_getdetachstate (&gattr, &i); if (s) printf (" pthread_attr_getdetachstate failed" ); printf ("after pthread_detach,\nDetach state = %s\n" , (i == PTHREAD_CREATE_DETACHED) ? "PTHREAD_CREATE_DETACHED" : (i == PTHREAD_CREATE_JOINABLE) ? "PTHREAD_CREATE_JOINABLE" : "???" ); pthread_attr_destroy (&gattr); return NULL ; }; int main (int argc, char * argv[]) { pthread_t thread_id; int s; s = pthread_create (&thread_id, NULL , &thread_start, NULL ); if (s != 0 ) { printf ("pthread_create failed\n" ); return 0 ; }; pthread_exit (NULL ); };
用pthread_attr_getstacksize获取栈尺寸:
1 2 3 4 int pthread_attr_getstacksize ( pthread_attr_t *attr, size_t *stacksize ) ;
例如获取默认栈尺寸和最小尺寸:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <limits.h> static void * thread_start (void * arg) { int i, res; size_t stack_size; pthread_attr_t gattr; res = pthread_getattr_np (pthread_self (), &gattr); if (res) printf ("pthread_getattr_np failed\n" ); res = pthread_attr_getstacksize (&gattr, &stack_size); if (res) printf ("pthread_getattr_np failed\n" ); printf ("Default stack size is %u byte; minimum is %u byte\n" , stack_size, PTHREAD_STACK_MIN); pthread_attr_destroy (&gattr); return NULL ; }; int main (int argc, char * argv[]) { pthread_t thread_id; int s; s = pthread_create (&thread_id, NULL , &thread_start, NULL ); if (s != 0 ) { printf ("pthread_create failed\n" ); return 0 ; }; pthread_join (thread_id, NULL ); };
某个线程一定有一种调度策略来调度它,管理多个线程如何占用CPU就是线程调度,不同操作系统的调度方法/策略不同。实时指操作系统对中断的响应实时性非常高,非实时相反。VxWorks属于实时操作系统RTOS,Windows和Linux属于非实时操作系统(或称分时操作系统TSOS)。响应实时的表现是抢占,通过优先级控制,优先级高的任务最先占用CPU。Linux线程有实时和分时之分,调度策略有分时调度策略、先来先服务调度策略、实时的分时调度策略,后两者只用于实时线程。
对于分时调度策略(或称轮转策略,用SCHED_OTHER表示),不支持优先级,为每个线程分配一段运行时间,称为时间片。优先级返回0。
对于先来先服务调度策略(用SCHED_FIFO表示),支持优先级抢占。CPU让一个先来的线程执行完再调度下一个线程,顺序按照创建线程的先后。线程一旦占用CPU则一直运行,直到有更高优先级任务到达或自己放弃CPU。若有和正在运行的线程具有相同优先级的线程已经就绪,则必须等待正在运行的线程主动放弃后才可以运行这个就绪的线程。优先级范围为1~99。
对于实时的分时调度策略(或称时间片轮转/轮询调度策略,用SCHED_RR表示),支持优先级抢占。CPU分配给每个线程一个特定时间片,当线程时间片用完,系统将重新分配时间片,并将线程置于实时线程就绪队列尾部,保证所有具有相同优先级的线程能被公平调度。优先级范围同上。
用sched_get_priority_min和sched_get_priority_min分别获得某调度策略的最低和最高优先级。
1 2 3 4 5 6 7 #include <sched.h> int sched_get_priority_max ( int policy ) ;int sched_get_priority_min ( int policy ) ;
例如获取各调度策略下的最低/高优先级:
1 2 3 4 5 6 7 8 9 #include <stdio.h> #include <unistd.h> #include <sched.h> int main () { printf ("Valid priority range for SCHED_OTHER: %d - %d\n" , sched_get_priority_min (SCHED_OTHER), sched_get_priority_max (SCHED_OTHER)); printf ("Valid priority range for SCHED_FIFO: %d - %d\n" , sched_get_priority_min (SCHED_FIFO), sched_get_priority_max (SCHED_FIFO)); printf ("Valid priority range for SCHED_RR: %d - %d\n" , sched_get_priority_min (SCHED_RR), sched_get_priority_max (SCHED_RR)); return 0 ; };
线程主动退出可用pthread_exit或直接函数返回。
1 2 3 void pthread_exit ( void *retval ) ;
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include <pthread.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <errno.h> #define PTHREAD_NUM 2 void * thrfunc1 (void * arg) { static int count = 1 ; pthread_exit ((void *)(&count)); }; void * thrfunc2 (void * arg) { static int count = 2 ; return (void *)(&count); }; int main (int argc, char * argv[]) { pthread_t pid[PTHREAD_NUM]; int retPid; int * pRet1; int * pRet2; if ((retPid = pthread_create (&pid[0 ], NULL , thrfunc1, NULL )) != 0 ) { perror ("create pid first failed" ); return -1 ; }; if ((retPid = pthread_create (&pid[1 ], NULL , thrfunc2, NULL )) != 0 ) { perror ("create pid second failed" ); return -1 ; }; if (pid[0 ] != 0 ) { pthread_join (pid[0 ], (void **)&pRet1); printf ("get thread 0 exitcode: %d\n" , *pRet1); }; if (pid[1 ] != 0 ) { pthread_join (pid[1 ], (void **)&pRet2); printf ("get thread 1 exitcode: %d\n" , *pRet2); }; return 0 ; };
线程被动退出有两种方法。一种可在同进程的另一个线程中用pthread_kill向要结束的进程发送信号,目标线程收到信号后再退出。此时接收信号的线程必须先用signal注册该信号的处理函数。若线程代码内不对某信号做处理,则按照信号默认形为影响整个进程。例如没有为SIGQUIT信号实现处理函数则默认整个进程退出,所以若sig参数不为0,一定要实现线程信号处理函数,否则影响整个进程。
1 2 3 4 int pthread_kill ( pthread_t thread, int sig ) ;
例如向线程发送请求结束信号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <pthread.h> #include <signal.h> #include <unistd.h> using namespace std;static void on_signal_term (int sig) { cout << "sub thread will exit" << endl; pthread_exit (NULL ); }; void * thfunc (void * arg) { signal (SIGQUIT, on_signal_term); int tm = 50 ; while (true ) { cout << "thrfunc--left:" << tm << " s--" << endl; sleep (1 ); tm--; }; return (void *)0 ; }; int main (int argc, char * argv[]) { pthread_t pid; int res; res = pthread_create (&pid, NULL , thfunc, NULL ); sleep (5 ); pthread_kill (pid, SIGQUIT); pthread_join (pid, NULL ); cout << "sub thread has completed,main thread will exit\n" ; return 0 ; };
此外还可用pthread_kill探测线程是否还存活:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <pthread.h> #include <signal.h> #include <unistd.h> #include "errno.h" using namespace std;void * thfunc (void * arg) { int tm = 50 ; while (1 ) { cout << "thrfunc--left:" << tm << " s--" << endl; sleep (1 ); tm--; }; return (void *)0 ; }; int main (int argc, char * argv[]) { pthread_t pid; int res; res = pthread_create (&pid, NULL , thfunc, NULL ); sleep (5 ); int kill_rc = pthread_kill (pid, 0 ); if (kill_rc == ESRCH) cout << "the specified thread did not exists or already quit\n" ; else if (kill_rc == EINVAL) cout << "signal is invalid\n" ; else cout << "the specified thread is alive\n" ; return 0 ; };
另一种在同进程的另一个线程中用pthread_cancel发送取消请求,请求取消目标线程的执行。不过就算发送成功也不一定意味着线程停止运行了,只有被取消的线程下次调用系统函数、C库函数或pthread_testcancel函数等时,才会真正结束线程。这种在线程执行过程中,检测是否有未响应取消信号的地方叫做取消点。被取消线程成功停止运行将返回常数PTHREAD_CANCELED,即-1。
1 2 3 int pthread_cancel ( pthread_t thread ) ;
可用pthread_testcancel让内核检测是否需要取消当前线程。
1 void pthread_testcancel (void ) ;
例如发送取消请求并成功取消线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> void * thfunc (void * arg) { int i = 1 ; printf ("thread start-------- \n" ); while (1 ) { i++; pthread_testcancel (); }; return (void *)0 ; }; int main () { void * ret = NULL ; int iret = 0 ; pthread_t tid; pthread_create (&tid, NULL , thfunc, NULL ); sleep (1 ); pthread_cancel (tid); pthread_join (tid, &ret); if (ret == PTHREAD_CANCELED) printf ("thread has stopped,and exit code: %d\n" , ret); else printf ("some error occured" ); return 0 ; };
例如当子线程在被阻塞到accept等函数时被动退出,被锁独占的资源将永远无法被释放。此时可用pthread_cleanup_push和pthread_cleanup_pop让线程退出时做一些清理工作,前者把一个函数压入清理函数栈,后者弹出栈顶清理函数并根据参数决定是否执行清理函数。
被pthread_cleanup_push压栈的清理函数在三种情况下执行:线程主动结束;调用pthread_cleanup_pop且参数非零;线程被其他线程取消。
1 2 3 4 void pthread_cleanup_push ( void (*routine)(void *), void *arg ) ;
pthread_cleanup_pop的参数execute为0时不执行清理函数,非0时执行。pthread_cleanup_push和pthread_cleanup_pop必须成对出现在同一函数中,即使不执行,否则编译不通过。
1 2 3 void pthread_cleanup_pop ( int execute ) ;
例如取消线程时引发清理函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> void mycleanfunc (void * arg) { printf ("mycleanfunc:%d\n" , *((int *)arg)); return ; }; void * thfunc (void * arg) { int i = 1 ; printf ("thread start-------- \n" ); pthread_cleanup_push (mycleanfunc, &i); while (1 ) { i++; printf ("i=%d\n" , i); pthread_testcancel (); }; printf ("this line will not run\n" ); pthread_cleanup_pop (0 ); return (void *)0 ; }; int main (void ) { void * ret = NULL ; int iret = 0 ; pthread_t tid; pthread_create (&tid, NULL , thfunc, NULL ); sleep (1 ); pthread_cancel (tid); pthread_join (tid, &ret); if (ret == PTHREAD_CANCELED) printf ("thread has stopped,and exit code: %d\n" , ret); else printf ("some error occured" ); return 0 ; };
C++11线程类 默认构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include <stdio.h> #include <stdlib.h> #include <chrono> #include <iostream> #include <thread> void thfunc (int n) { std::cout << "thfunc:" << n << std::endl; return ; }; int main (int argc, const char * argv[]) { std::thread threads[5 ]; std::cout << "create 5 threads...\n" ; for (int i = 0 ; i < 5 ; i++) threads[i] = std::thread (thfunc, i + 1 ); for (auto & t : threads) t.join (); std::cout << "All threads joined.\n" ; return EXIT_SUCCESS; };
用初始化构造函数传结构体参数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <iostream> #include <thread> using namespace std;typedef struct { int n; const char * str; }MYSTRUCT; void thfunc (void * arg) { MYSTRUCT* p = (MYSTRUCT*)arg; cout << "in thfunc:n=" << p->n << ",str=" << p->str << endl; return ; }; int main (int argc, char * argv[]) { MYSTRUCT mystruct; mystruct.n = 110 ; mystruct.str = "hello world" ; thread t (thfunc, &mystruct) ; t.join (); return 0 ; };
C++11创建的线程也是可连接的线程,这里可用detach进行分离,C++11线程类可以和POSIX结合使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #include <iostream> #include <thread> using namespace std;void thfunc (int n, int m, int * k, char s[]) { cout << "in thfunc:n=" << n << ",m=" << m << ",k=" << *k << "\nstr=" << s << endl; *k = 5000 ; return ; }; int main (int argc, char * argv[]) { int n = 110 , m = 200 , k = 5 ; char str[] = "hello world" ; thread t (thfunc, n, m, &k, str) ; t.detach (); cout << "k=" << k << endl; pthread_exit (NULL ); cout << "this line will not run" << endl; return 0 ; };
移动构造函数如下。其中move调用成功后,参数不代表任何thread对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include <iostream> #include <thread> using namespace std;void fun (int & n) { cout << "fun: " << n << "\n" ; n += 20 ; this_thread::sleep_for (chrono::milliseconds (10 )); return ; }; int main (void ) { int n = 0 ; cout << "n=" << n << '\n' ; n = 10 ; thread t1 (fun, ref(n)) ; thread t2 (move(t1)) ; t2.join (); cout << "n=" << n << '\n' ; return 0 ; };
例如获取线程ID:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <thread> using namespace std;thread::id main_thread_id = this_thread::get_id (); void is_main_thread () { if (main_thread_id == this_thread::get_id ()) std::cout << "This is the main thread.\n" ; else std::cout << "This is not the main thread.\n" ; return ; }; int main () { is_main_thread (); thread th (is_main_thread) ; th.join (); return 0 ; };
this_thread命名空间引用当前线程,其中调用yield函数的线程放弃执行并让出自己的CPU时间片,回到就绪态,使其他线程有机会运行。例如线程赛跑排名次:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <thread> #include <atomic> using namespace std;atomic<bool > ready (false ) ;void thfunc (int id) { while (!ready) this_thread::yield (); for (volatile int i = 0 ; i < 1000000 ; ++i); cout << id << "," ; return ; }; int main () { thread threads[10 ]; cout << "race of 10 threads that count to 1 million:\n" ; for (int i = 0 ; i < 10 ; ++i) threads[i] = thread (thfunc, i); ready = true ; for (auto & th : threads) th.join (); return 0 ; };
可用sleep_until或sleep_for阻塞线程,暂停执行一段时间。sleep_until表示函数阻塞线程到sleep_time时间点,到了该时间点后再继续执行。
1 2 template < class Clock, class Duration >void sleep_until (const std::chrono::time_point<Clock, Duration>& sleep_time) ;
例如挂起线程到下一个整分时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <thread> #include <chrono> #include <ctime> #include <time.h> #include <stddef.h> using namespace std;using std::chrono::system_clock;void getNowTime () { timespec time; struct tm nowTime; clock_gettime (CLOCK_REALTIME, &time); localtime_r (&time.tv_sec, &nowTime); char current[1024 ]; printf ("%04d-%02d-%02d %02d:%02d:%02d\n" , nowTime.tm_year + 1900 , nowTime.tm_mon + 1 , nowTime.tm_mday, nowTime.tm_hour, nowTime.tm_min, nowTime.tm_sec); return ; }; int main () { std::time_t tt = system_clock::to_time_t (system_clock::now ()); struct std ::tm* ptm = std::localtime (&tt); getNowTime (); cout << "Waiting for the next minute to begin...\n" ; ++ptm->tm_min; ptm->tm_sec = 0 ; this_thread::sleep_until (system_clock::from_time_t (mktime (ptm))); getNowTime (); return 0 ; };
sleep_for挂起线程一段时间sleep_duration,这段时间内暂停执行。
1 2 template < class Rep, class Period >void sleep_for (const std::chrono::duration<Rep, Period>& sleep_duration) ;
例如暂停线程5秒:
1 2 3 4 5 6 7 8 9 10 11 12 #include <iostream> #include <thread> #include <chrono> int main () { std::cout << "countdown:\n" ; for (int i = 5 ; i > 0 ; --i) { std::cout << i << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); }; std::cout << "Lift off!\n" ; return 0 ; };
线程同步 一次仅允许一个线程使用的共享资源叫做临界资源,个线程应互斥对其访问。每个线程中访问临界资源的代码称为临界区(或称临界段),每次只准许一个线程进入临界区。
开始时初始化一个互斥锁,进入临界区前把互斥锁加锁,退出临界区时把互斥锁解锁,最后不用互斥锁时将它销毁。
POSIX中用pthread_mutex_init初始化互斥锁,称为动态方式,需要销毁。其中restrict关键字只用于限定指针,告知编译器所有修改该指针所指向内容的操作全都基于该指针,不存在其他修改操作的途径,帮助编译器更好的代码优化。
1 2 3 4 int pthread_mutex_init ( pthread_mutex_t * restrict mutex, const pthread_mutexattr_t * restrict attr ) ;
使用方法:
1 2 3 4 5 6 pthread_mutex_t * pmutex = (pthread_mutex_t *)malloc (sizeof (pthread_mutex_t ));pthread_mutex_init (pmutex, NULL );pthread_mutex_t mutex;pthread_mutex_init (&mutex, NULL );
还可以用PTHREAD_MUTEX_INITIALIZER宏初始化互斥锁,称为常量初始化方式,不需要销毁。使用方法:
1 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
为互斥锁上锁可用pthread_mutex_lock或pthread_mutex_trylock。用pthread_mutex_lock时若互斥锁已被其他线程上锁,则调用该函数的线程将被阻塞,即让出CPU,避免忙等现象,必须与pthread_mutex_unlock成对使用。
1 2 3 int pthread_mutex_lock ( pthread_mutex_t *mutex ) ;
用pthread_mutex_trylock时若互斥锁已被其他线程上锁,则不阻塞,立即返回EBUSY。
1 2 3 int pthread_mutex_trylock ( pthread_mutex_t *mutex ) ;
用pthread_mutex_unlock对已上锁互斥锁解锁。
1 2 3 int pthread_mutex_unlock ( pthread_mutex_t *mutex ) ;
用pthread_mutex_destroy销毁互斥锁。
1 2 3 int pthread_mutex_destroy ( pthread_mutex_t *mutex ) ;
例如多线程累加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <sys/time.h> #include <string.h> #include <cstdlib> int gcn = 0 ;pthread_mutex_t mutex;void * thread_1 (void * arg) { int j; for (j = 0 ; j < 10000000 ; j++) { pthread_mutex_lock (&mutex); gcn++; pthread_mutex_unlock (&mutex); }; pthread_exit ((void *)0 ); return NULL ; }; void * thread_2 (void * arg) { int j; for (j = 0 ; j < 10000000 ; j++) { pthread_mutex_lock (&mutex); gcn++; pthread_mutex_unlock (&mutex); }; pthread_exit ((void *)0 ); return NULL ; }; int main (void ) { int j, err; pthread_t th1, th2; pthread_mutex_init (&mutex, NULL ); for (j = 0 ; j < 10 ; j++) { err = pthread_create (&th1, NULL , thread_1, (void *)0 ); if (err != 0 ) { printf ("create new thread error:%s\n" , strerror (err)); exit (0 ); }; err = pthread_create (&th2, NULL , thread_2, (void *)0 ); if (err != 0 ) { printf ("create new thread error:%s\n" , strerror (err)); exit (0 ); }; err = pthread_join (th1, NULL ); if (err != 0 ) { printf ("wait thread done error:%s\n" , strerror (err)); exit (1 ); }; err = pthread_join (th2, NULL ); if (err != 0 ) { printf ("wait thread done error:%s\n" , strerror (err)); exit (1 ); }; printf ("gcn=%d\n" , gcn); gcn = 0 ; }; pthread_mutex_destroy (&mutex); return 0 ; };
在读写操作问题中,对资源的访问有两种情况。一种访问方式是独占的,称为写操作;另一种访问方式是可以共享的,可有多个线程同时去访问某资源,称为读操作。
读写锁是多线程同步的另一种机制。若一个线程用读锁锁住了临界区,其他线程也可用读锁来进入临界区。但若这时再进行写锁加锁就会发生阻塞,后面若继续有读锁请求,则也都被阻塞。若一个线程用写锁锁住临界区,则其他线程无论读锁还是写锁都会发生阻塞。
读写锁比互斥锁具有更高的适用性和并行性,但读写锁系统开销更大。
读写锁用pthread_rwlock_init初始化,过程与互斥锁同理:
1 2 3 4 5 6 7 8 9 10 11 12 13 int pthread_rwlock_init ( pthread_rwlock_t * restrict rwlock, const pthread_rwlockattr_t * restrict attr ) ;pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;pthread_rwlock_t * prwlock = (pthread_rwlock_t *)malloc (sizeof (pthread_rwlock_t ));pthread_rwlock_init (prwlock, NULL );pthread_rwlock_t rwlock;pthread_rwlock_init (&rwlock, NULL );
读写锁的上锁分为读模式和写模式的上锁,上锁、解锁与销毁函数的用法与互斥锁同理:
1 2 3 4 5 6 7 8 9 10 int pthread_rwlock_rdlock (pthread_rwlock_t * rwlock) ;int pthread_rwlock_tryrdlock (pthread_rwlock_t * rwlock) ;int pthread_rwlock_wrlock (pthread_rwlock_t * rwlock) ;int pthread_rwlock_trywrlock (pthread_rwlock_t * rwlock) ;int pthread_rwlock_unlock (pthread_rwlock_t * rwlock) ;int pthread_rwlock_tryrdlock (pthread_rwlock_t * rwlock) ;int pthread_rwlock_timedrdlock (pthread_rwlock_t * restrict rwlock, const struct timespec* restrict abstime) ;int pthread_rwlock_timedwrlock (pthread_rwlock_t * restrict rwlock, const struct timespec* restrict abstime) ;
条件变量是一种同步机制,用于让一个线程因等待“条件变量的条件”而挂起,另一个线程在条件成立后向挂起的线程发送条件成立的信号。为了防止多个线程同时请求pthread_cond_wait而新城竞争条件,条件变量必须和一个互斥锁联合使用。
生产者消费者问题(或称有界缓冲区问题)是一个经典问题。两个线程共享一个公共的固定大小的缓冲区,其中一个是生产者,用于将数据放入缓冲区;另一个是消费者,用于从缓冲区取出数据。若当缓冲区已满,而生产者还想向其中放入一个新的数据项,应该让生产者休眠,等消费者从缓冲区取走一个或多个数据后再唤醒它。若当缓冲区已空,而消费者还想取数据,应该让消费者休眠,等生产者放入一个或多个数据时再唤醒它。
条件变量用pthread_cond_init初始化,过程与互斥锁同理:
1 2 3 4 5 6 7 8 9 10 11 12 13 int pthread_cond_init ( pthread_cond_t * cv, const pthread_condattr_t * cattr ) ;pthread_cond_t cond = PTHREAD_COND_INITIALIZER;pthread_cond_t * pcond = (pthread_cond_t *)malloc (sizeof (pthread_cond_t ));pthread_cond_init (pcond, NULL );pthread_cond_t cond;pthread_cond_init (&cond, NULL );
用pthread_cond_wait或pthread_cond_timedwait等待条件变量,并将线程阻塞在一个条件变量上。当条件不满足,pthread_cond_wait先将互斥锁解锁,再被条件变量阻塞,这是原子操作,不被打断。被阻塞的线程可以后通过其他线程唤醒。若唤醒后条件依然不满足,则线程将继续阻塞在这里,等被下一次唤醒。当函数等到条件变量时,先对互斥锁上锁,再唤醒本线程,这也是原子操作。只有当信号到来时互斥锁已解锁,该函数才会返回。
1 2 3 4 int pthread_cond_wait ( pthread_cond_t * restrict cond, pthread_mutex_t * restrict mutex ) ;
阻塞在条件变量上的线程被唤醒可能不是因为条件满足,而是因为虚假唤醒,所以pthread_cond_wait返回只代表共享数据可能被改变,必须要重新判断。
pthread_cond_timedwait是计时等待条件变量。当等待的时间超过timespec,则返回ETIME。timespec中的秒和纳秒数是从1970年1月1日00:00:00开始即使,到现在经历的时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 struct timespec {#ifdef __USE_TIME_BITS64 __time64_t tv_sec; #else __time_t tv_sec; #endif #if __WORDSIZE == 64 || (defined __SYSCALL_WORDSIZE && __SYSCALL_WORDSIZE == 64) || (__TIMESIZE == 32 && !defined __USE_TIME_BITS64) __syscall_slong_t tv_nsec; #else # if __BYTE_ORDER == __BIG_ENDIAN int : 32 ; long int tv_nsec; # else long int tv_nsec; int : 32 ; # endif #endif }; int pthread_cond_timedwait ( pthread_cond_t * restrict cond, pthread_mutex_t * restrict mutex, const struct timespec* restrict abstime ) ;
用pthread_cond_signal唤醒一个等待条件变量的线程,用pthread_cond_broadcast唤醒所有等待该条件变量的线程。
1 2 3 4 5 6 int pthread_cond_broadcast ( pthread_cond_t * cond ) ; int pthread_cond_signal ( pthread_cond_t * cond ) ;
用pthread_cond_destroy销毁条件变量:
1 2 3 int pthread_cond_destroy ( pthread_cond_t *cond ) ;
例如找出1到20中能整除3的整数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t cond = PTHREAD_COND_INITIALIZER;int i = 1 ;void * thread1 (void *) ;void * thread2 (void *) ;int main (void ) { pthread_t t_a, t_b; pthread_create (&t_a, NULL , thread2, NULL ); pthread_create (&t_b, NULL , thread1, NULL ); pthread_join (t_b, NULL ); pthread_mutex_destroy (&mutex); pthread_cond_destroy (&cond); return 0 ; }; void * thread1 (void * junk) { for (int current = 1 ; current <= 20 ; current++) { pthread_mutex_lock (&mutex); i = current; if (i % 3 == 0 ) pthread_cond_signal (&cond); else printf ("thead1:%d\n" , i); pthread_mutex_unlock (&mutex); sleep (1 ); }; i++; pthread_mutex_lock (&mutex); pthread_cond_signal (&cond); pthread_mutex_unlock (&mutex); return NULL ; }; void * thread2 (void * junk) { pthread_mutex_lock (&mutex); while (1 ) { while (i % 3 != 0 && i <= 20 ) pthread_cond_wait (&cond, &mutex); if (i > 20 ) break ; printf ("------------thread2:%d\n" , i); i++; pthread_mutex_unlock (&mutex); sleep (1 ); pthread_mutex_lock (&mutex); }; pthread_mutex_unlock (&mutex); return NULL ; };
C++线程同步 使用基本互斥锁实现多线程统计计数器到100000:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <thread> #include <mutex> volatile int counter (0 ) ; std::mutex mtx; void thrfunc (void ) { for (int i = 0 ; i < 10000 ; ++i) { mtx.lock (); ++counter; mtx.unlock (); }; return ; }; int main (int argc, const char * argv[]) { std::thread threads[10 ]; for (int i = 0 ; i < 10 ; ++i) threads[i] = std::thread (thrfunc); for (auto & th : threads) th.join (); std::cout << "count to " << counter << " successfully \n" ; return 0 ; };
其中volatile关键字表示该变量可能被编译器未知的因素修改,例如操作系统、硬件或其他线程等,暗示编译器对访问该变量的代码不再优化。在这多线程情境下,当两个线程可能改变某个变量,需要用volatile声明,防止优化编译器把变量从内存装入寄存器中。
定时互斥锁在基本互斥锁的基础上多了个定时功能。其中try_lock尝试锁住互斥锁,若当前互斥锁被其他线程锁住,则返回false且不会被阻塞掉。上面例子用非阻塞上锁版本重写为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include <thread> #include <mutex> volatile int counter (0 ) ; std::mutex mtx; void thrfunc (void ) { for (int i = 0 ; i < 10000 ; ++i) if (mtx.try_lock ()) { ++counter; mtx.unlock (); } else { std::cout << "try_lock false\n" ; i--; }; return ; }; int main (int argc, const char * argv[]) { std::thread threads[10 ]; for (int i = 0 ; i < 10 ; ++i) threads[i] = std::thread (thrfunc); for (auto & th : threads) th.join (); std::cout << "count to " << counter << " successfully \n" ; return 0 ; };
线程池 线程池就是有一堆创建好的进程,初始处于空闲等待状态,当有新任务需处理时,从这堆线程里取一个空闲线程来处理该任务,任务处理完毕后再把线程放回池中,供后面任务继续使用。当池中线程全部处于忙碌状态,没有可用空闲等待进程,此时需选择创建一个新线程并置入池中,或通知任务当前线程池所有线程都在忙,需等待片刻重试。
线程池好处在于线程复用,某个线程处理完一个任务后,可继续处理下一个任务,不用销毁后再创建,可避免无谓的开销,适用于连续产生大量并发任务的场合。
thread_pool.h内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 #ifndef __THREAD_POOL_H #define __THREAD_POOL_H #include <vector> #include <string> #include <pthread.h> using namespace std;class CTask { protected : string m_strTaskName; void * m_ptrData; public : CTask () = default ; CTask (string& taskName) : m_strTaskName (taskName), m_ptrData (NULL ) {}; virtual int Run () = 0 ; void setData (void * data) ; virtual ~CTask () {}; }; class CThreadPool { private : static vector<CTask*> m_vecTaskList; static bool shutdown; int m_iThreadNum; pthread_t * pthread_id; static pthread_mutex_t m_pthreadMutex; static pthread_cond_t m_pthreadCond; protected : static void * ThreadFunc (void * threadData) ; static int MoveToIdle (pthread_t tid) ; static int MoveToBusy (pthread_t tid) ; int Create () ; public : CThreadPool (int threadNum); int AddTask (CTask* task) ; int StopAll () ; int getTaskSize () ; }; void CTask::setData (void * data) { m_ptrData = data; return ; }; vector<CTask*> CThreadPool::m_vecTaskList; bool CThreadPool::shutdown = false ;pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;CThreadPool::CThreadPool (int threadNum) { this ->m_iThreadNum = threadNum; printf ("I will create %d threads.\n" , threadNum); Create (); return ; }; void * CThreadPool::ThreadFunc (void * threadData) { pthread_t tid = pthread_self (); while (1 ) { pthread_mutex_lock (&m_pthreadMutex); while (m_vecTaskList.size () == 0 && !shutdown) pthread_cond_wait (&m_pthreadCond, &m_pthreadMutex); if (shutdown) { pthread_mutex_unlock (&m_pthreadMutex); printf ("[tid: %lu]\texit\n" , pthread_self ()); pthread_exit (NULL ); }; printf ("[tid: %lu]\trun: " , tid); vector<CTask*>::iterator iter = m_vecTaskList.begin (); CTask* task = *iter; if (iter != m_vecTaskList.end ()) { task = *iter; m_vecTaskList.erase (iter); }; pthread_mutex_unlock (&m_pthreadMutex); task->Run (); printf ("[tid: %lu]\tidle\n" , tid); }; return (void *)0 ; }; int CThreadPool::AddTask (CTask* task) { pthread_mutex_lock (&m_pthreadMutex); m_vecTaskList.push_back (task); pthread_mutex_unlock (&m_pthreadMutex); pthread_cond_signal (&m_pthreadCond); return 0 ; }; int CThreadPool::Create () { pthread_id = new pthread_t [m_iThreadNum]; for (int i = 0 ; i < m_iThreadNum; i++) pthread_create (&pthread_id[i], NULL , ThreadFunc, NULL ); return 0 ; }; int CThreadPool::StopAll () { if (shutdown) return -1 ; printf ("Now I will end all threads!\n\n" ); shutdown = true ; pthread_cond_broadcast (&m_pthreadCond); for (int i = 0 ; i < m_iThreadNum; i++) pthread_join (pthread_id[i], NULL ); delete [] pthread_id; pthread_id = NULL ; pthread_mutex_destroy (&m_pthreadMutex); pthread_cond_destroy (&m_pthreadCond); return 0 ; }; int CThreadPool::getTaskSize () { return m_vecTaskList.size (); }; #endif
test.cpp:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <iostream> #include <thread> #include <mutex> #include "thread_pool.h" #include <cstdio> #include <cstdlib> #include <unistd.h> volatile int counter (0 ) ; std::mutex mtx; class CMyTask :public CTask {public : CMyTask () = default ; int Run () { printf ("%s\n" , (char *)m_ptrData); int x = rand () % 4 + 1 ; sleep (x); return 0 ; }; ~CMyTask () {}; }; int main (void ) { CMyTask taskObj; char szTmp[] = "hello!" ; taskObj.setData ((void *)szTmp); CThreadPool threadpool (5 ) ; for (int i = 0 ; i < 10 ; i++) threadpool.AddTask (&taskObj); while (1 ) { printf ("There are still %d tasks need to handle\n" , threadpool.getTaskSize ()); if (threadpool.getTaskSize () == 0 ) if (threadpool.StopAll () == -1 ) { printf ("Thread pool clear, exit.\n" ); exit (0 ); }; sleep (2 ); printf ("2 seconds later..\n" ); }; return 0 ; };
接下来用C++11实现线程池。创建XThread类代表一个线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 #pragma once #include <list> #include <mutex> class XTask ;struct event_base ;class XThread {public : void Start () ; void Main () ; void Notify () ; void Activate (int arg) ; void AddTack (XTask*) ; XThread (); ~XThread (); int id = 0 ; private : event_base* base = 0 ; std::list<XTask*> tasks; std::mutex tasks_mutex; }; #include <thread> #include <iostream> using namespace std;#include <unistd.h> #include "testUtil.h" #include "XThread.h" #include "XTask.h" void XThread::Start (void ) { testout (id << " thread At Start()" ); thread th (&XThread::Main, this ) ; th.detach (); return ; }; void XThread::Main (void ) { cout << id << " thread::Main() begin" << endl; cout << id << " thread::Main() end" << endl; return ; }; void XThread::Activate (int arg) { testout (id << " thread At Activate()" ); XTask* t = NULL ; tasks_mutex.lock (); if (tasks.empty ()) { tasks_mutex.unlock (); return ; }; t = tasks.front (); tasks.pop_front (); tasks_mutex.unlock (); t->Init (arg); return ; }; void XThread::AddTack (XTask* t) { if (!t) return ; t->base = this ->base; tasks_mutex.lock (); tasks.push_back (t); tasks_mutex.unlock (); return ; }; XThread::XThread () {}; XThread::~XThread () {};
然后实现XThreadPool线程池类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #pragma once #include <vector> class XThread ;class XTask ;class XThreadPool {public : static XThreadPool* Get (void ) { static XThreadPool p; return &p; }; void Init (int threadCount) ; void Dispatch (XTask*, int arg) ; private : int threadCount; int lastThread = -1 ; std::vector<XThread*> threads; XThreadPool (void ) {}; }; #include "XThreadPool.h" #include "XThread.h" #include "XTask.h" #include <thread> #include <iostream> using namespace std;#include "testUtil.h" void XThreadPool::Dispatch (XTask* task, int arg) { testout ("main thread At XThreadPoll::dispathch()" ); if (!task) return ; int tid = (lastThread + 1 ) % threadCount; lastThread = tid; XThread* t = threads[tid]; t->AddTack (task); t->Activate (arg); return ; }; void XThreadPool::Init (int threadCount) { testout ("main thread At XThreadPoll::Init()" ); this ->threadCount = threadCount; this ->lastThread = -1 ; for (int i = 0 ; i < threadCount; i++) { cout << "Create thread" << i << endl; XThread* t = new XThread (); t->id = i; t->Start (); threads.push_back (t); this_thread::sleep_for (chrono::milliseconds (10 )); }; return ; };
然后实现XTask任务类:
1 2 3 4 5 6 7 8 #pragma once class XTask {public : struct event_base * base = 0 ; int thread_id = 0 ; virtual bool Init (int arg) = 0 ; };
然后实现每个自定义任务类的具体内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include "XTask.h" #include <string> using namespace std;class CMyTask :public XTask {public : bool Init (int arg) ; }; #include "mytask.h" bool CMyTask::Init (int arg) { long long i = 0 , c = 0 ; while (c < 10000000 ) { while (i < 1000000000 ) i++; c++; }; printf ("%d---------%lld--------\n" , arg, c); return false ; };
运行主程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #include "XThreadPool.h" #include "XThread.h" #include "mytask.h" #define XThreadPoolGet XThreadPool::Get() int main (void ) { int i; XThreadPoolGet->Init (10 ); CMyTask task[10 ]; for (i = 0 ; i < 10 ; i++) XThreadPoolGet->Dispatch (&task[i], i); return 0 ; };