找回密码
 会员注册
查看: 58|回复: 0

浅谈协程

[复制链接]

2万

主题

0

回帖

6万

积分

超级版主

积分
64454
发表于 2024-9-21 03:37:20 | 显示全部楼层 |阅读模式
作者:kylinkzhang,腾讯CSIG后台开发工程师什么是协程我们可以简单的认为:协程就是用户态的线程,但是上下文切换的时机是靠调用方(写代码的开发人员)自身去控制的。同时,协程和用户态线程非常接近,用户态线程之间的切换不需要陷入内核,但部分操作系统中用户态线程的切换需要内核态线程的辅助。下面是一个简单的例子:void A() {   cout << 1 << " ";   cout << 2 << " ";   cout << 3 << " ";}void B() {   cout << "x" << " ";   cout << "y" << " ";   cout << "z" << " ";}int main(void) {  A();  B();}在单线程中,上述函数的输出为:1 2 3 x y z如果我们用 libco 库将上面程序改造一下:void A() {   cout << 1 << " ";   cout << 2 << " ";   co_yield_ct();  // 切出到主协程   cout << 3 << " ";}void B() {   cout << "x" << " ";   co_yield_ct();  // 切出到主协程   cout << "y" << " ";   cout << "z" << " ";}int main(void) {  ...  // 主协程  co_resume(A);  // 启动协程 A  co_resume(B);  // 启动协程 B  co_resume(A);  // 从协程 A 切出处继续执行  co_resume(B);  // 从协程 B 切出处继续执行}同样在单线程中,改造后的程序输出如下:1 2 x 3 y z可以看出,切出操作是由 co_yield_ct() 函数实现的,而协程的启动和恢复是由 co_resume 实现的。函数 A() 和 B() 并不是一个执行完才执行另一个,而是产生了 “交叉执行“ 的效果,这就是通过协程实现的!线程挺好的,我们为什么需要协程呢?因为有些时候我们在执行一些操作(尤其是IO操作)时,不希望去做“创建一个新的线程”这种重量级的操作来异步处理。而是希望:在当前线程执行中,暂时切换到其他任务中执行,同时在IO真正准备好了之后,再切换回来继续执行!相比于多开一个线程来操作,使用协程的好处:减少了线程的重复高频创建;尽量避免线程的阻塞;提升代码的可维护与可理解性(毕竟不需要考虑多线程那一套东西了);更多协程的好处:https://www.zhihu.com/question/20511233同时,下面是一些协程的特点:协程可以主动让出 CPU 时间片;(注意:不是当前线程让出 CPU 时间片,而是线程内的某个协程让出时间片供同线程内其他协程运行;)协程可以恢复 CPU 上下文;当另一个协程继续执行时,其需要恢复 CPU 上下文环境;协程有个管理者,管理者可以选择一个协程来运行,其他协程要么阻塞,要么ready,或者died;运行中的协程将占有当前线程的所有计算资源;协程天生有栈属性,而且是 lock free;不理解这些协程特点也不要紧,下文都会讲到。补充:线程上下文下图中展示了线程在运行过程 CPU 需要的一些信息(CPU Context,CPU 上下文),比如通用寄存器、栈信息(EBP/ESP)等,进程/线程切换时需要保存与恢复这些信息。而进程/内核态线程切换的时候需要与OS内核进行交互,保存/读取 CPU 上下文信息。线程时间消耗分析内核态(Kernel)的一些数据是共享的,读写时需要同步机制,所以操作一旦陷入内核态就会消耗更多的时间。进程需要与操作系统中所有其他进程进行资源争抢,且操作系统中资源的锁是全局的;线程之间的数据一般在进程内共享,所以线程间资源共享相比如进程而言要轻一些。虽然很多操作系统(比如 Linux)进程与线程区别不是非常明显,但线程还是比进程要轻。线程的切换(Context Switch)相比于其他操作而言并不是非常耗时,如下图所示(2018年):参考这篇 Linux线程相关文章,Linux 2.6 之后 Linux 多线程的性能提高了很多,大部分场景下线程切换耗时在 2us 左右;下面是 Linux 下线程切换耗时统计(2013 年):正常情况下线程可用的 CPU 时间片都在数十毫秒级别,而线程切换占总耗时的千分之几以内,协程的使用可以将这个损耗进一步降低(主要是去除了其他操作,比如 futex 等)。线程内存消耗分析不是所有编程语言或者系统都支持一次创建很多线程。例如,在 x32 系统中即使使用了虚内存空间,因为进程能访问的虚内存空间大概是 3GB,所以单进程最多创建 300 多条线程(假设系统为每条线程分配 10M 栈空间),太多线程甚至还伴随着由于线程切换而触发缺页中断的风险。如果我们创建很多线程(比如 x64 系统下创建 1 万个线程),不考虑优先级且假设 CPU 有 10 个核心,那么每个线程每秒有 1ms 的时间片,整个业务的耗时大概是: (n-1) * 1 + n * 0.001(n−1)∗1+n∗0.001秒;(n 是线程在处理业务的过程中被调度的次数);如果大量线程之间存在资源竞争,那么系统行为将难以预测。所以在有限的资源下创建大量线程是极其不合理的,服务线程的个数和 CPU 核心数应该在一个合理的比例内。操作系统线程调度可参考:https://help.perforce.com/sourcepro/current/HTML/index.html#page/SourcePro_Core/threadsug-ThreadPackage.22.118.html在默认情况下,Linux 系统给每条线程分配的栈空间最大是 6~8MB,这个大小是上限,也是虚内存空间,并不是每条线程真实的栈使用情况。线程真实栈内存使用会随着线程执行而变化,如果线程只使用了少量局部变量,那么真实线程栈可能只有几十个字节的大小;系统在维护线程时需要分配额外的空间,所以线程数的增加还是会提高内存资源的消耗。通过上面的分析我们可以知道:如果业务处理时间远小于 IO 耗时,线程切换非常频繁,那么使用协程是不错的选择;并且,协程的优势并不仅仅是减少线程之间切换,从编程的角度来看,协程的引入简化了异步编程;同时,协程为一些异步编程提供了无锁的解决方案,即:协程可以用同步编程的方式实现异步编程才能实现的功能。如何保存上下文很多地方把协程称为 Subroutine;Subroutine是什么?就是函数!上古时期的计算机科学家们早就给出了概念:Coroutine就是可以中断并恢复执行的Subroutine。因此从这个角度来看协程拥有调用栈并不是一个奇怪的事情。再来思考,Coroutine与Subroutine相比有什么区别?区别仅有一个就是:Coroutine可以中断并恢复,对应的操作就是 yield/resume。这样看来Subroutine不过是Coroutine的一个子集罢了,也就是说把协程当做一个特殊的函数调用:可以中断并恢复既然可以把 Coroutine 当做一个特殊的函数调用,那么如何像切换函数一样去切换Coroutine呢?难点在于:除了像函数一样切换出去,还要在某种条件满足的时候切换回来。通常的做法是:在协程内部存储自身的上下文,并在需要切换的时候把上下文切换;我们知道上下文其实本质上就是寄存器,所以保存上下文实际上就是把寄存器的值保存下来。相对应的,有下面几种方法:使用 setjmp/longjmp;使用汇编保存寄存器中的值,libco就使用了这种方法;使用 ucontext.h 这个封装好的库也可以帮我们完成上下文的相关工作。关于setjmp.h:https://zh.m.wikipedia.org/zh-hans/Setjmp.h需要注意的是:setjmp/longjmp 一般不能作为协程实现的底层机制,因为 setjmp/longjmp 对栈信息的支持有限。关于 ucontext.h:https://en.wikipedia.org/wiki/Setcontext下面分别来看 setjmp 和 ucontext 方法,至于使用汇编的方法,会在本文讲解有栈协程是讲述。使用setjmp/longjmp下面代码模拟了单线程并发执行两个 while(true){...} 函数:源代码:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/setjmp_demosetjmp_demo/setjmp_demo.cc#include #include #include int max_iteration = 9;int iter;jmp_buf Main;jmp_buf ointPing;jmp_buf ointPong;void ing() {    if (setjmp(PointPing) == 0) longjmp(Main, 1); // 可以理解为重置,reset the world    while (1) {        printf("%3d : ing-", iter);        if (setjmp(PointPing) == 0) longjmp(PointPong, 1);    }}void ong() {    if (setjmp(PointPong) == 0) longjmp(Main, 1);    while (1) {        printf("Pong\n");        iter++;        if (iter > max_iteration) exit(0);        if (setjmp(PointPong) == 0) longjmp(PointPing, 1);    }}int main(int argc, char* argv[]) {    iter = 1;    if (setjmp(Main) == 0) ing();    if (setjmp(Main) == 0) ong();    longjmp(PointPing, 1);}首先,我们定义了三个保存调用栈的节点:jmp_bufMain;jmp_bufPointPing;jmp_bufPointPong;并在main函数中首先创建(启动)了两个函数:Ping、Pong,在使用longjmp(PointPing,1);之后,PointPing不再是0,从而启动了Ping协程。此后,函数Ping和函数Pong在while(1)中交替执行,而不再返回main函数中。最后,当iter>max_iteration时,调用exit(0)退出。通过命令g++-std=c++11setjmp_demo.cc-osetjmp_demo编译后执行./setjmp_demo,输出如下:1ing-Pong2ing-Pong3ing-Pong4ing-Pong5ing-Pong6ing-Pong7ing-Pong8ing-Pong9ing-Pong虽然上面实现了比较简单的函数切换,但是实际上我们无法通过setjmp.h库获取到真正的上下文信息。如果想要真正获取到上下文信息,可以使用ucontext.h库。使用ucontext下面关于ucontext的介绍源自:http://pubs.opengroup.org/onlinepubs/7908799/xsh/ucontext.h.html实际上,ucontextlib已经不推荐使用了,但依旧是不错的协程入门资料。其他底层协程库实现可以查看:Boost.Contexttbox协程库的对比可以参考:https://github.com/tboox/benchbox/wiki/switchlinux系统一般都存在ucontext这个C语言库,这个库主要用于:操控当前线程下的CPU上下文。和setjmp/longjmp不同,ucontext直接提供了设置函数运行时栈的方式(makecontext),避免不同函数栈空间的重叠。需要注意的是:ucontext只操作与当前线程相关的CPU上下文,所以下文中涉及ucontext的上下文均指当前线程的上下文;(一般CPU有多个核心,一个线程在某一时刻只能使用其中一个,所以ucontext只涉及一个与当前线程相关的CPU核心)ucontext.h头文件中定义了ucontext_t这个结构体,这个结构体中至少包含以下成员:ucontext_t *uc_link     // next contextsigset_t    uc_sigmask  // 阻塞信号阻塞stack_t     uc_stack    // 当前上下文所使用的栈mcontext_t  uc_mcontext // 实际保存 CPU 上下文的变量,这个变量与平台&机器相关,最好不要访问这个变量可移植的程序最好不要读取与修改ucontext_t中的uc_mcontext,因为不同平台下uc_mcontext的实现是不同的。同时,ucontext.h头文件中定义了四个函数,下面分别介绍:int  getcontext(ucontext_t *); // 获得当前 CPU 上下文int  setcontext(const ucontext_t *);// 重置当前 CPU 上下文void makecontext(ucontext_t *, (void *)(), int, ...); // 修改上下文信息,比如设置栈指针int  swapcontext(ucontext_t *, const ucontext_t *);下面分别来看。getcontext#include int getcontext(uconte t_t *ucp);getcontext`函数使用当前CPU上下文初始化ucp所指向的结构体,初始化的内容包括:CPU寄存器、信号mask和当前线程所使用的栈空间;返回值:getcontext成功返回0,失败返回-1。setcontext#include int setcontext(ucontext_t *ucp);和getcontext函数类似,setcontext函数用于:设置CPU寄存器、信号mask和当前线程所使用的栈空间。需要特别注意的是:如果函数setcontext执行成功,那么调用setcontext的函数将不会返回,因为当前CPU的上下文已经交给其他函数或者过程了,当前函数完全放弃了对CPU的“所有权”。getcontext和setcontext的应用:当信号处理函数需要执行的时候,当前线程的上下文需要保存起来,随后进入信号处理阶段;makecontext#include void makecontext(ucontext_t *ucp, (void *func)(), int argc, ...);makecontext修改由getcontext创建的上下文ucp;如果ucp指向的上下文由swapcontext或setcontext恢复,那么当前线程将执行传递给makecontext的函数func(...)。执行makecontext后需要为新上下文分配一个栈空间,如果不创建,那么新函数func执行时会使用旧上下文的栈,而这个栈可能已经不存在了。同时,argc必须和func中整型参数的个数相等。swapcontext#include int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);swapcontext将当前上下文信息保存到oucp中并使用ucp重置CPU上下文。返回值:成功则返回0;失败返回-1并置errno;如果ucp所指向的上下文没有足够的栈空间以执行余下的过程,swapcontext将返回-1。总结相比于setjml略微简单的功能,使用ucontext我们可以方便的获取当前调用函数的上下文,进而实现协程。协程的类别协程的实现不只有一种,很多活跃的语言如Python、Java、Golang等都是支持协程的。尽管这些协程可能名称不同,甚至用法也不同,但它们都可以被划分为两大类:有栈(stackful)协程,这类协程的实现类似于内核态线程的实现,不同协程间切换还是要切换对应的栈上下文,只是不用陷入内核而已;例如:goroutine、libco;无栈(stackless)协程,无栈协程的上下文都会放到公共内存中,在协程切换时使用状态机来切换,而不用切换对应的上下文(因为都已经在堆中了),因此相比有栈协程要轻量许多;例如:C++20、Rust、JavaScript中的协程。这里所谓的有栈、无栈:并不是说这个协程运行的时候有没有栈,而是说协程之间是否存在调用栈(CallbackStack);同时,根据协程之间是否有明显的调用关系,我们又可以把协程分为:非对称协程:协程之间有明显的调用关系;对称协程:协程之间无明显的调用关系。例如,协程A调用了协程B:如果只有B完成之后才能调用A,那么此时A/B是非对称协程;如果A/B被调用的概率相同,那么此时A/B是对称协程;下面我们分别来看。有栈协程开源库libco就是通过汇编语言实现的有栈协程库,我们来看一看libco中对于32位机器的上下文切换操作是如何完成的:通过分析代码看到,无论是co_yield_ct还是co_resume,在协程切出和恢复时,都调用了同一个函数co_swap,在这个函数中调用了coctx_swap来实现协程的切换,这一函数的原型是:void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");两个参数都是coctx_t*指针类型,其中第一个参数表示要切出的协程,第二个参数表示切出后要进入的协程。coctx_swap函数便是用汇编实现的,我们这里只关注x86-64相关的部分,其代码如下:coctx_swap:leaq8(%rsp),%raxleaq112(%rdi),%rsppushq%raxpushq%rbxpushq%rcxpushq%rdxpushq-8(%rax)//retfuncaddrpushq%rsipushq%rdipushq%rbppushq%r8pushq%r9pushq%r12pushq%r13pushq%r14pushq%r15movq%rsi,%rsppopq%r15popq%r14popq%r13popq%r12popq%r9popq%r8popq%rbppopq%rdipopq%rsipopq%rax//retfuncaddrpopq%rdxpopq%rcxpopq%rbxpopq%rsppushq%raxxorl%eax,%eaxret可以看出,coctx_swap中并未像常规被调用函数一样创立新的栈帧。先看前两条语句:leaq8(%rsp),%raxleaq112(%rdi),%rspleaq用于把其第一个参数的值赋值给第二个寄存器参数,而第一条语句用来把8(%rsp)的本身的值存入到%rax中。注意:这里使用的并不是8(%rsp)指向的值,而是把8(%rsp)表示的地址赋值给了%rax,这一地址是父函数栈帧中除返回地址外栈帧顶的位置。在第二条语句leaq112(%rdi),%rsp中,%rdi存放的是coctx_swap第一个参数的值,这一参数是指向coctx_t类型的指针,表示当前要切出的协程,这一类型的定义如下:struct coctx_t {    void *regs[ 14 ];     size_t ss_size;    char *ss_sp;};因而112(%rdi)表示的就是第一个协程的coctx_t中regs[14]数组的下一个64位地址,而接下来的语句:pushq%raxpushq%rbxpushq%rcxpushq%rdxpushq-8(%rax)//retfuncaddrpushq%rsipushq%rdipushq%rbppushq%r8pushq%r9pushq%r12pushq%r13pushq%r14pushq%r15第一条语句pushq%rax用于把%rax的值放入到regs[13]中,resg[13]用来存储第一个协程的%rsp的值。这时%rax中的值是第一个协程coctx_swap父函数栈帧除返回地址外栈帧顶的地址。由于regs[]中有单独的元素存储返回地址,栈中再保存返回地址是无意义的,因而把父栈帧中除返回地址外的栈帧顶作为要保存的%rsp值是合理的;当协程恢复时,把保存的regs[13]的值赋值给%rsp即可恢复本协程coctx_swap父函数堆栈指针的位置。第一条语句之后的语句就是用pushq把各CPU寄存器的值依次从regs尾部向前压入。即通过调整%rsp把regs[14]当作堆栈,然后利用pushq把寄存器的值和返回地址存储到regs[14]整个数组中。并且,regs[14]数组中各元素与其要存储的寄存器对应关系如下://-------------// 64 bit//low | regs[0]: r15 |//    | regs[1]: r14 |//    | regs[2]: r13 |//    | regs[3]: r12 |//    | regs[4]: r9  |//    | regs[5]: r8  | //    | regs[6]: rbp |//    | regs[7]: rdi |//    | regs[8]: rsi |//    | regs[9]: ret |  //ret func addr, 对应 rax//    | regs[10]: rdx |//    | regs[11]: rcx | //    | regs[12]: rbx |//hig | regs[13]: rsp |接下来的汇编语句:movq%rsi,%rsppopq%r15popq%r14popq%r13popq%r12popq%r9popq%r8popq%rbppopq%rdipopq%rsipopq%rax//retfuncaddrpopq%rdxpopq%rcxpopq%rbxpopq%rsp这里用的方法还是通过改变%rsp的值,把某块内存当作栈来使用。第一句movq%rsi,%rsp就是让%rsp指向coctx_swap第二个参数,这一参数表示要进入的协程。而第二个参数也是coctx_t类型的指针,即执行完movq语句后,%rsp指向了第二个参数coctx_t中regs[0],而之后的pop语句就是用regs[0-13]中的值填充cpu的寄存器,这里需要注意的是popq会使得%rsp的值增加而不是减少,这一点保证了会从regs[0]到regs[13]依次弹出到cpu寄存器中。在执行完最后一句popq%rsp后,%rsp已经指向了新协程要恢复的栈指针(即新协程之前调用coctx_swap时父函数的栈帧顶指针),由于每个协程都有一个自己的栈空间,可以认为这一语句使得%rsp指向了要进入协程的栈空间。coctx_swap中最后三条语句如下:pushq%raxxorl%eax,%eaxretpushq%rax用来把%rax的值压入到新协程的栈中,这时%rax是要进入的目标协程的返回地址,即要恢复的执行点;然后用xorl把%rax低32位清0以实现地址对齐;最后ret语句用来弹出栈的内容,并跳转到弹出的内容表示的地址处,而弹出的内容正好是上面pushq%rax时压入的%rax的值,即之前保存的此协程的返回地址。即最后这三条语句实现了转移到新协程返回地址处执行,从而完成了两个协程的切换。可以看出,这里通过调整%rsp的值来恢复新协程的栈,并利用了ret语句来实现修改指令寄存器%rip的目的,通过修改%rip来实现程序运行逻辑跳转。注意:%rip的值不能直接修改,只能通过call或ret之类的指令来间接修改;整体上看来,协程的切换其实就是:cpu寄存器内容特别是%rip和%rsp的写入和恢复,因为cpu的寄存器决定了程序从哪里执行(%rip)和使用哪个地址作为堆栈(%rsp)。寄存器的写入和恢复如下图所示:执行完上图的流程,就将之前cpu寄存器的值保存到了协程A的regs[14]中,而将协程Bregs[14]的内容写入到了寄存器中,从而使执行逻辑跳转到了B协程regs[14]中保存的返回地址处开始执行,即实现了协程的切换(从A协程切换到了B协程执行)。详细关于libco的实现细节:https://www.zhihu.com/question/52193579https://zhuanlan.zhihu.com/p/27409164https://github.com/yyrdl/libco-code-study无栈协程无栈协程的本质就是一个状态机(statemachine),它可以理解为在另一个角度去看问题,即:同一协程协程的切换本质不过是指令指针寄存器的改变。首先,我们来看一个使用libco的协程的例子(当然libco是一个有栈协程):void* test(void* para){ co_enable_hook_sys(); int i = 0; poll(0, 0, 0. 1000); // 协程切换执行权,1000ms后返回 i++; poll(0, 0, 0. 1000); // 协程切换执行权,1000ms后返回 i--; return 0;}int main(){ stCoRoutine_t* routine; co_create(&routine, NULL, test, 0); // 创建一个协程 co_resume(routine);  co_eventloop( co_get_epoll_ct(),0,0 ); return 0;}这段代码实际的意义就是:主协程跑一个协程去执行test函数,在test中我们需要两次从协程中切换出去,这里对应了两个poll操作(hook机制),hook后的poll所做的事情就是把当前协程的CPU执行权切换到调用栈的上一层,并在超时或注册的fd就绪时返回(当然样例这里就只是超时了)。如果是无栈协程,实现相同逻辑的代码是怎么样的呢?其实就是翻译成类似于以下状态机的代码:class test_coroutine {    int i;    int __state = 0;    void MoveNext() {        switch(__state) {        case 0:            return frist();        case 1:            return second();        case 2:         return third();        }    }    void frist() {        i = 0;        __state = 1;    }    void second() {        i++;        _state = 2;    }    void third() {     i--;    }};我们可以看到:相比与有栈协程中的test函数,这里把整个协程抽象成一个类,以原本需要执行切换的语句处为界限,把函数划分为几个部分,并在某一个部分执行完以后进行状态转移,在下一次调用此函数的时候就会执行下一部分。这样的话我们就完全没有必要像有栈协程那样显式的执行上下文切换了,我们只需要一个简易的调度器来调度这些函数即可。在Rust中,async也是一个语法糖,实际上编译后就是实现了类似于上面的代码结构,感兴趣的可以去看《asyncbook》。从执行时栈的角度来看:其实所有的协程共用的都是一个栈,即系统栈,也就也不必我们自行去给协程分配栈,因为是函数调用,我们当然也不必去显示的保存寄存器的值。而且相比有栈协程把局部变量放在新开的空间上,无栈协程直接使用系统栈使得CPUcache局部性更好,同时也使得无栈协程的中断和函数返回几乎没有区别,这样也可以凸显出无栈协程的高效。对称协程与非对称协程前文中也简单提到了对称和非对称协程,这里也简单聊一下吧。其实对于“对称”这个名词,阐述的实际是:协程之间的关系。用大白话来说就是:对称协程就是说协程之间人人平等,没有谁调用谁一说,大家都是一样的,而非对称协程就是协程之间存在明显的调用关系。简单来说就是这样:对称协程SymmetricCoroutine:任何一个协程都是相互独立且平等的,调度权可以在任意协程之间转移;非对称协程AsymmetricCoroutine:协程出让调度权的目标只能是它的调用者,即协程之间存在调用和被调用关系;其实两者的实现我觉得其实差异不大,非对称协程其实就是拥有调用栈,而非对称协程则是大家都平等,不需要调用栈,只需要一个数据结构存储所有未执行完的协程即可。至于哪种更优?这个需要分情况:如果你使用协程的目的是为了优化一些IO密集型应用,那么协程切换出去的时候就是它等待事件到来的时候,此时你就算切换过去也没有什么意义,还不如等到事件到来的时候自动切换回去。其实上面说的是有一些问题,因为这个执行权的切换实际上是(调用者–被调用者)之间的切换,对称就是它们之间都是平等的,就是假如A协程执行了B,C协程,那么B协程可以切换回A,也可以切换回C;而非对称只能是B切换回A,A切换回C,C再切换回A,以此类推。这样看起来显然非对称协程相比之下更为符合我们的认知,因为对称协程目前我不知道如何选择一个合适的协程来获得CPU执行权,正如上面所说,此协程可能正在等待事件;当然如果调度算法足够优秀的话,对称协程也是可取的。关于协程的一些其他内容N:1&N:M协程我们知道,和线程绑定的协程只有在对应线程运行的时候才有被执行的可能,如果对应线程中的某一个协程完全占有了当前线程,那么当前线程中的其他所有协程都不会被执行。同时,协程的所有信息都保存在上下文(Contex)对象中,将不同上下文分发给不同的线程就可以实现协程的跨线程执行,如此,协程被阻塞的概率将减小。因此,借用BRPC中对N:M协程的介绍,来解释下什么是N:M协程。我们常说的协程通常指的是N:1线程库,即所有的协程运行于一个系统线程中,计算能力和各类eventloop库等价;由于不跨线程,协程之间的切换不需要系统调用,可以非常快(100ns-200ns),受cache一致性的影响也小;但代价是协程无法高效地利用多核,代码必须非阻塞,否则所有的协程都被卡住……bthread是一个M:N线程库,一个bthread被卡住不会影响其他bthread。其中的关键技术有两点:workstealing调度;butex;前者让bthread更快地被调度到更多的核心上,后者让bthread和pthread可以相互等待和唤醒,这两点协程都不需要;更多brpc的线程见:https://github.com/apache/incubator-brpc/blob/master/docs/cn/threading_overview.md这么看来貌似bthread自己实现了golang的goroutine?表面看起来的却如此:两者都实现了M:N用户态线程。但是事实上,golang中的goroutine的实现要更为复杂一些:bthread的设计比较接近go1.0版本:OS线程不会动态增加,在有大量的阻塞性syscall下,会有影响。而go1.1之后的设计就是动态增减OS线程,而且提供了LockOSThread,可以让goroutine和OS线程1:1。关于这个问题,见:https://www.zhihu.com/question/65549422协程的组成通过上面的描述,N:M模式下的协程其实就是可用户确定调度顺序的用户态线程,与系统级线程对照可以将协程框架分为以下几个模块:协程上下文:对应操作系统中的PCB/TCB(Process/ThreadControlBlock);保存协程上下文的容器:对应操作系统中保存PCB/TCB的容器,一般是一个列表(在实际实现时,协程上下文容器可以使用一个也可以使用多个,比如:普通协程队列、定时的协程优先队列等);协程的执行器:协程的调度器,对应操作系统中的进程/线程调度器;执行协程的worker线程,对应实际线程/进程所使用的CPU核心;协程的调度协程的调度与OS线程调度十分相似,如下图协程调度示例所示:协程相关工具系统级线程有锁(mutex)、条件变量(condition)等工具,协程也有对应的工具;比如:libgo提供了协程之间使用的锁Co_mutex/Co_rwmutex。不同协程框架对工具的支持程度不同,实现方式也不尽相同;对此问题,本文不做深入介绍。系统级线程和协程处于不同的系统层级,所以两者的同步工具不完全通用,如果在协程中使用了线程的锁(例如:std::mutex),则整个线程将会被阻塞,当前线程将不会再调度与执行其他协程。最简单的例子:如果在一个协程中使用了sleep,那么这个线程下的所有协程全部都会被阻塞。****在使用协程时,这种方法是非常低效的。协程&线程的对比协程对CPU/IO的影响协程的目的在于剔除线程的阻塞,尽可能提高CPU的利用率。很多服务在处理业务时需要请求第三方服务,向第三方服务发起RPC调用;RPC调用的网络耗时一般耗时在毫秒级别,RPC服务的处理耗时也可能在毫秒级别,如果当前服务使用同步调用,即RPC返回后才进行后续逻辑,那么一条线程每秒处理的业务数量是可以估算的。假设每次业务处理花费在RPC调用上的耗时是20ms,那么一条线程一秒最多处理50次请求。如果在等待RPC返回时当前线程没有被系统调度转换为Ready状态,那当前CPU核心就会空转,浪费了CPU资源!通过增加线程数量提高系统吞吐量的效果非常有限,而且创建大量线程也会造成其他问题。协程虽然不一定能减少一次业务请求的耗时,但一定可以提升系统的吞吐量:当前业务只有一次第三方RPC的调用,那么协程不会减少业务处理的耗时,但可以提升QPS;当前业务需要多个第三方RPC调用,同时创建多个协程可以让多个RPC调用一起执行,则当前业务的RPC耗时由耗时最长的RPC调用决定。C++20标准中的协程虽然C++20标准中引入了协程,但是C++20只引入了协程需要的底层支持,所以直接使用相对比较难,不过很多库已经提供了封装,比如:ASIO;cppcoro;需要说明的是:C++20协程的性能还是非常高的,等C++23提供简化后的lib,我们就可以非常方便地使用协程了。就目前而言,编译协程相关代码需要g++10或者更高版本(clang++12对协程支持有限):可以通过下面的命令安装:Mac:brewinstallgcc@10;Ubuntu:aptinstallgcc-10/aptinstallg++-10;下面我写了一个使用C++20标准中协程的例子:cpp20_demo/cpp_20_demo.cc#include #include struct HelloCoroutine {    struct HelloPromise {        HelloCoroutine get_return_object() {            return std::coroutine_handle::from_promise(*this);        }        std::suspend_never initial_suspend() { return {}; }        std::suspend_always final_suspend() noexcept { return {}; }        void unhandled_exception() {}    };    using promise_type = HelloPromise;    HelloCoroutine(std::coroutine_handle h) : handle(h) {}    std::coroutine_handle handle;};HelloCoroutine hello() {    std::cout << "Hello " << std::endl;    co_await std::suspend_always{};    std::cout << "world!" << std::endl;}int main() {    HelloCoroutine coro = hello();    std::cout << "calling resume" << std::endl;    coro.handle.resume();    std::cout << "destroy" << std::endl;    coro.handle.destroy();    return 0;}编译执行后输出:Hello calling resumeworld!destroy由于篇幅有限,这里不再详述C++20标准中的协程使用了;如果想更深入的学习,可以参考:《C++20 - The Complete Guide》——Nicolai M. Josuttis,作者官网值得一看;如何编写 C++ 20 协程(Coroutines)协程入门https://cloud.tencent.com/developer/article/1882417https://en.cppreference.com/w/cpp/language/coroutineshttps://www.scs.stanford.edu/~dm/blog/c++-coroutines.html动手实现协程上面文章的内容基本上已经把整个协程介绍的七七八八了。看了这么多内容,你是不是心动想要自己动手写一个协程库了呢?那么,跟随下面的内容,一起使用C++实现协程吧。是的,有栈协程、无栈协程都会实现一遍。基于汇编实现的有栈协程首先我们来使用汇编来实现一个有栈协程,这里参考的是微信开源的 libco;源代码:https://github.com/JasonkayZK/cpp-learn/tree/coroutine/stack_co协程环境本例中实现的协程不支持跨线程,而是每个线程分配一个环境,来维护该线程下运行中的协程之间的层次关系,代码如下:stack_co/environment.h#ifndef COROUTINE_ENVIRONMENT_H#define COROUTINE_ENVIRONMENT_H#include "coroutine.h"#include #include #include #include namespace stack_co {    class Coroutine;    class Environment {        friend class Coroutine;    public:        // Thread-local instance        static Environment &instance();        // Factory method        template        std::shared_ptr create_coroutine(Entry &entry, Args &...arguments);        // No copy constructor        Environment(const Environment &) = delete;        // No Assignment Operator        Environment &operator=(const Environment &) = delete;        // Get current coroutine in the stack        Coroutine *current();    private:        // No explicit constructor        Environment();        void push(std::shared_ptr coroutine);        void pop();    private:        // Coroutine calling stack        std::array, 1024> _c_stack;        // Top of the coroutine calling stack        size_t _c_stack_top;        // Main coroutine(root)        std::shared_ptr _main;    };    // A default factory method    template    inline std::shared_ptr Environment::create_coroutine(Entry &entry, Args &...arguments) {        return std::make_shared(                this, std::forward(entry), std::forward<< 0;        constexpr static Bitmask IDLE = 1 << 1;        constexpr static Bitmask RUNNING = 1 << 2;        constexpr static Bitmask EXIT = 1 << 3;        Bitmask operator&(Bitmask mask) const { return flag & mask; }        Bitmask operator|(Bitmask mask) const { return flag | mask; }        Bitmask operator^(Bitmask mask) const { return flag ^ mask; }        void operator&=(Bitmask mask) { flag &= mask; }        void operator|=(Bitmask mask) { flag |= mask; }        void operator^=(Bitmask mask) { flag ^= mask; }        Bitmask flag;    };} // namespace stack_co#endif //COROUTINE_STATUS_H协程相关的状态主要包括了下面几类:MAIN:仅作为协程入口调用栈的标记;IDLE:空闲状态;RUNNING:执行中;EXIT:线程退出。并重载了一些运算符。协程实例协程的实例主要是用于支持接口 resume 和 yield;代码如下:stack_co/coroutine.h#ifndef COROUTINE_COROUTINE_H#define COROUTINE_COROUTINE_H#include "status.h"#include "context.h"#include << 17;        constexpr static size_t rdi = 7;        constexpr static size_t rsi = 8;        constexpr static size_t ret = 9;        constexpr static size_t rsp = 13;    public:        void prepare(callback ret, word rdi);        void switch_from(context *previous);        bool test();    private:        word get_stack_pointer();        void fill_registers(word sp, callback ret, word rdi, ...);    private:        /**         * we must ensure that registers are at the top of the memory layout.         *         * so the context must have no virtual method, and len at least 14!         */        word _registers[14];        char _stack[stack_size];    };} // namespace stack_co#endif //coroutine_context_h对应的><< "running code in a "              << (stack_co::test() ? "coroutine" : "thread")              << std::endl;}void print1() {    std::cout << 1 << std::endl;    stack_co::this_coroutine::yield();    std::cout << 2 << std::endl;}void print2(int i, stack_co::Coroutine *co1) {    std::cout << i << std::endl;    co1-><< "bye" << std::endl;}int main() {    auto &env = stack_co:pen();    auto co1 = env.create_coroutine(print1);    auto co2 = env.create_coroutine(print2, 3, co1.get());    co1-><< 32);        auto *s = (Schedule *) ptr;        int id = s->
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-28 08:01 , Processed in 1.143800 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表