找回密码
 会员注册
查看: 19|回复: 0

PHP协程多任务调度实践

[复制链接]

2万

主题

0

回帖

6万

积分

超级版主

积分
68169
发表于 2024-10-9 17:20:32 | 显示全部楼层 |阅读模式
PHP协程多任务调度实践 PHP协程多任务调度实践 卢阳、郭润和 贝壳产品技术 贝壳产品技术 “贝壳产品技术公众号”作为贝壳官方产品技术号,致力打造贝壳产品、技术干货分享平台,面向互联网/O2O开发/产品从业者,每周推送优质产品技术文章、技术沙龙活动及招聘信息等。欢迎大家关注我们。 242篇内容 2021年03月19日 14:10 1、背景随着贝壳的业务扩大和用户数量的增长,对我们现有的PHP生态架构体系的并发能力提出了更高的要求。和大多互联网公司一样,我们使用是微服务架构,面临的大多是IO密集型问题,通过多种平台、中台和服务聚合出结构化的信息。设想用户打开App如何在较短的时间内展示出大量相关的信息,这背后的会涉及到3层以上的调用链路和几十个微服务的访问。如何解决这种大量的IO密集型请求的效率,对于下游依赖的微服务RPC并发性能的提升,对于数据库MySQL能不能做并发访问,这一些问题成为本文阐述的切入点。PHP语言默认不支持多线程,确切需要使用多线程可以安装pthread扩展,同时会引入线程安全等问题。当然也可以使用多进程模拟多线程来处理密集型IO场景,但这种代价过高。于是参考golang的轻量级的协程,其比较适合处理这种场景,为了解决这种场景,能否用对PHP协程的友好封装,从而给出一个让开发者易于使用的composer包?该包应该封装了PHP协程的复杂调用和切换过程,让开发者轻松的实现RPC,MySQL并发访问等IO密集型的并发能力。2、解决方案PHP5.5加入了对迭代生成器和协程的支持,协程拥有很强大的功能但是非常复杂,这么多年过去了,大家对于协程的关注还是很少。本文想探索一种PHP版本的协程处理多任务组件。该篇文章是基于鸟哥的博客--《在PHP中使用协程实现多任务调度》(https://www.laruence.com/2015/05/28/3038.html),对实际项目中的一些高并发的场景进行一些探索性的尝试。鸟哥的这篇文章中核心输出了一个Task Scheduler 协程模型,基于这个模型可以组合出多种协程模式,去解决我们的两个场景:· 场景一:并发调用RPC· 场景二:并发调用MySQL解决RPC场景是为了解决大部分微服务对多个下游调用的情况,解决MySQL的场景是为了解决一些接口数据需要对多张表中的数据做聚合的情况。然而,这里不讨论对Redis的并发场景,Redis的访问应该是一个特别快的操作且可以pipeline等方式来优化IO的读写操作。3、具体实现3.1 Task Scheduler 模型3.1.1 Task Scheduler 模型介绍在鸟哥的这篇文章中,使用PHP Generator 和 yield关键字实现了一个Task类和一个Scheduler类,本文实现了一个Executor类,来封装调用Scheduler的入口,Scheduler类中存储了一个队列负责存储Task类的对象,一旦发现Task执行完成则可以出该队列,否则继续放回到队列末尾。如此,可以用协程来做IO密集型的并发请求批量发出而不必同步等待结果,而是在下一轮协程执行到的时候去获取结果。Generator类介绍:生成器提供了一种更容易的方法来实现简单的对象迭代,相比较定义类实现 Iterator 接口的方式,性能开销和复杂性大大降低。Generator::current:返回当前产生的值Generator::next:生成器继续执行Generator::send:向生成器中传入一个值yield关键字:实现任务调度器的关键是yield关键字,它最简单的调用形式看起来像一个return申明,不同之处在于普通return会返回值并终止函数的执行,而yield会返回一个值给循环调用此生成器的代码并且只是暂停执行生成器函数。除此之外,yield还有以下能力:· yield提供了一种任务自身中断的能力,可将控制权交给任务调度器;· yield类似于return的声明,也是将子任务的执行结果返回给了任务调度器;· yield通过迭代器的send()方法,可实现调度器像任务发送数据;基于以上三个能力我们就可以实现一个任务调度器:· 通过自身中断结束子任务运行;· 通过yield和迭代器的send()方法实现任务和调度器之间的通信;3.1.2 Task Scheduler 模型实现图:Task Scheduler 模型结构由上图可以看出,在调度器Scheduler中,通过一个队列TaskQueue来作为任务的容器,通过run()方法来循环调度任务。具体流程如下:1)判断taskQueue是否为空,为空退出,不为空执行2;2)调用任务的run方法,执行该任务;3)判断任务是否结束,已结束,从任务集合中删除该任务。未结束将该任务加入到TaskQueue队尾。继续执行1、2。代码如下:publicfunctionrun(){while(!$this->taskQueue->isEmpty()){$task=$this->taskQueue->dequeue();$task->run();if($task->isFinished()){unset($this->taskMap[$task->getTaskId()]);}else{$this->schedule($task);}}}//下面是Task的run()方法:publicfunctionrun(){if($this->beforeFirstYield){$this->beforeFirstYield=false;return$this->coroutine->current();}else{$retval=$this->coroutine->send($this->sendValue);$this->sendValue=null;return$retval;}}该方法通过迭代器的current()方法和send()方法来启动任务。最后,还需要一个执行器,负责初始化调度器并启动调度器。代码如下:publicfunctionrun($funcList){if(empty($funcList)){return[];}$scheduler=newScheduler;foreach($funcListas$func){$scheduler->newTask($func());}$scheduler->run($result);}funcList为任务函数闭包列表。该函数做三件事:首先,创建调度器;其次,填充任务到TaskQueue;最后,启动调度器;上面描述的任务(Task)、调度器(Scheduler)、执行器(Executor)组成了多任务调度模型;3.1.3 Task Scheduler 模型使用下面我们就来实际看一个例子funcList=[];for($i=0;$irun($funcList);该例子很简单,就是创建一个执行器,并将实现填充的匿名函数数组作为参数调用run()方法。匿名函数做的事情非常简单:子任务一:生成0~100的随机数并赋值给$a;子任务二:生成0~100的随机数并赋值给$b;子任务三:将$a、$b 做求和操作;可以由代码中看出以上三个步骤之间都由关键字yield隔开了。现在我们看一下执行结果:图:Task Scheduler 模型子任务执行结果于匿名函数的实现中有yield关键字,当任务启动后,任务将以yield作为分界线拆解子任务,并执行该任务。按照顺序执行10个任务中的任务一、任务二、任务三。以上就是任务调度的全部内容和调用例子示例。3.2 RPC的并发实现3.2.1 设计思路有了Task Scheduler 模型,可以来调度项目中的RPC请求,让请求异步发出,然后统一接收接收结果,如此实现了RPC的并发。3.2.2 具体实现首先,项目中的RPC请求是基于GuzzleHttp client,可以完成同步和异步的请求。然而,GuzzleHttp client 的请求时基于promise,可以看下promise 的三种状态:pending/reslove/reject。pending就是未决,resolve可以理解为成功,reject可以理解为拒绝。图:promise的三种状态在请求发出的时候处于promise处于pending状态,如果未完成则一直处于pending状态,如果已完成则变为resolve状态,如果是完成有错误或异常则转为reject状态。在理解了promise的状态后,我们看下现有系统如何使用promise。如下图:在HttpClient类中的get和post方法中封装了promise 并且将request 和wait连续执行。图:现有系统RPC类图于是可以把query的请求调用栈从最外层一直到最里层拆分为request 和 wait两步。用Task Scheduler 把这两步用协程的方式调用起来,实现非阻塞批量请求,并依次收集返回结果。改造后实现类图如下:图:Task Scheduler模型改造后的RPC类图其中TsHttpClient继承于HttpClient,并实现了以下几个方法:getRequestAsync:GET 请求构建getWait:GET 发送请求并获取结果postRequestAsync:POST请求构建postWait:POST发送请求并获取结果TsCorutineService继承于FundamentalService,要实现并发RPC主要的代码为://执行方法数组$funcList=[];foreach($requestListas$key=>$reqItem){$funcList[]=function()use($key,$reqItem,&$tsRet){$uri=$reqItem['uri'];$query=$reqItem['query'];$data=$reqItem['data'];$httpMethod=$reqItem['httpMethod'];yieldlist($tmpRequest,$options)=$reqItem['serviceObj']->queryRequestAsync($uri,$query,$data,$httpMethod);yield$result=$reqItem['serviceObj']->queryWait($uri,$query,$data,$httpMethod,$tmpRequest,$options);$tsRet[$key]=$result;};}$exec=newExecutor();$exec->run($funcList);上面的代码出现两个yield,和任务调度中的示例代码一样,通过yeild将RPC调用分割为两个子任务,这样通过使用多任务调度模型,就可以实现并发RPC的调用了。为何需要这么麻烦的去用继承和覆盖原有方法来实现这种协程的并发RPC方式呢?原因是对现有系统改造最小,之前使用同步的方式无需改动。需要使用该流程请求只需TsCoroutineService,并且使用改造后的TsHttpClient,就可实现。而不影响,图:《现有系统RPC类图》中的流程。控制了这种新的RPC方式的风险。3.2.3 实验及结果分析下面我们对以上实现的并发RPC调用和GuzzleHttp的调用进行一个性能对比试验。实验设计:调用测试接口,测试接口中有多个服务(8个)的接口调用,对比响应时间;压测上述接口,对比压测结果;测试结果:使用postman单次调用接口响应时间使用ab压测结果:图:压测结果—使用协程并发图:压测结果—-使用HttpMultiClient结果分析:使用并发RPC方式比使用HttpMultiClient稍慢一些,究其原因为,并发RPC方式调用将一个请求分为两步:1.发送请求;2.获取结果,执行完一个批量中的所有请求的的第一步后,将顺序执行第二步,这时,如果第二步请求的服务还未返回,将阻塞直到该请求返回为止;这样如果遇到一个等待时间较长的请求,这些等待时长都将叠加在后边的每一个请求上;而HttpMultiClient底层使用GuzzleHttp的Promise批量请求,调用requestAsync时,不会直接发送请求,而是加入队列中,当调用send方法时,才会向服务发起请求,而后轮询的检查是否有返回,当其中一个服务有返回就不再等待(多路复用I/O),这样就避免了使用多任务调度器来并发的等待问题。具体图示如下:图:RPC的并发和HttpMultiClient批量请过程求分析基于以上分析,我们可以对RPC的并发方式做进一步的性能提升,可以改造promise的wait方法,可以输出处于pending状态,然后继续进入Task Scheduler的任务的容器中,从而尝试下一个任务的状态;如果当前任务处于resolve状态则可以直接处理无需再放入Task Scheduler的任务的容器中。如此可以进一步提升处理的效率。这里不做实现,留给读者尝试。3.2.4 小结在说明了Task Scheduler模型后,紧接着介绍了如何使用Task Scheduler来实现RPC的并发请求。比对了现有系统中同步的RPC方式和如何使用协程模型改造后的方式,描述了是改造的过程和说明协程使用的原理。最后,与HttpMultiClient并发方式比对了并发的效率,和分析产生效率差异的原因。3.3 Mysql异步并发访问在使用这个协程模型实现了RPC的并发访问后,尝试使用其来解决mysql的并发访问问题。3.3.1 设计思路思路一:mysqli实现的异步并发(以下简称:mysqli方式)。Mysql服务端本身就是一个多线程的服务,mysqli支持异步访问的方式,可以通过Task Scheduler模型来实现这种异步查询,从而达到并发访问mysql的效果。思路二:通过并发RPC方式请求自身服务,调用多组查库接口来实现(以下简称:请求自身服务方式)。3.3.2 mysqli方式具体实现MySQL多线程介绍:MySQL数据库对于每个查询请求都是单独启动一个线程进行处理。如果MySQL服务器启动线程过多,必然会造成线程切换引起系统负载过高。如果在MySQL数据库负载不高的情况下,使用异步查询还是不错的选择。图:同步vs异步访问DB的性能比较上图的异步模式即为我们需要实现的并发访问MySQL的模式。无需赘述,同步访问需要等待完成后才能进行下一个访问,异步则可并发方式发出,随后统一接收返回。为了实现异步的访问,首先需要定义db类:classdb{static$links;private$obj;functiongetConn(){$host='127.0.0.1';$user='root';$password='';$database='test';$port=3306;$this->obj=newmysqli($host,$user,$password,$database,$port);self:links[spl_object_hash($this->obj)]=$this->obj;returnself:links[spl_object_hash($this->obj)];}functionasync_query($sql){$link=$this->getConn();$link->query($sql,MYSQLI_ASYNC);return$link;}functionfetch(){for($i=1;$iobj===$obj){$sql_result=$obj->reap_async_query();return$sql_result;}}}}}这里使用了mysqli扩展,需要安装或打开这个extension。mysqli扩展封装的功能强大,提供了一套操作数据库的函数,方便后续做异步的请求和获取结果。query方法,在第二个参数必须设置为MYSQL_ASYNC,此处标示开启异步模式。mysqli_poll 用来做轮询连接,在php 5.3.0+及之后的版本开始支持。Read数组要检测是否存在可以读取的结果集的连接的数组。用reap_async_query来获取异步查询的结果。其次,定义需要处理的function:functionf1(){$db=newdb();$obj=$db->async_query('selectsleep(1)');echo"f1async_query\n";yield$obj;$row=$db->fetch();var_dump('f1resultisnull');yield$row;}functionf2(){$db=newdb();$obj=$db->async_query('select*fromuserwhereid=2');echo"f2async_query\n";yield$obj;$row=$db->fetch();var_dump('f2resultis',$row);yield$row;}functionf3(){$db=newdb();$obj=$db->async_query('selectsleep(1)');echo"f3async_query\n";yield$obj;$row=$db->fetch();var_dump('f3resultisnull');yield$row;}在定义function时,把async_query 和fetch使用yield关键字,使得协程在处理了function1的请求后接着去处理function2的请求,而不是等待fetch的结果。接着,将function放入到Scheduler的task中:$scheduler=newScheduler;$scheduler->newTask(f1());$scheduler->newTask(f2());$scheduler->newTask(f3());$scheduler->run();最后,在这种模式下即可发出异步的mysql请求。结果验证:结果如下图所示,最终耗时为1.078s,那么f1 和f3 都做了sleep 1s,如果是同步的访问整体结果应该在2s以上,所以达到了并发访问的效果。图:mysqli方式访问结果3.3.3 请求自身服务方式具体实现:实现方式描述:首先,在接收到一个外部请求后拆分成多个需要对DB请求;然后,向自身域名下的基于Task Scheduler 的RPC模型对的负责解析并发DB请求的Api发出请求;随后,负责解析DB请求的Api解析查询语句,向DB发出请求;最后,收集并发请求负责解析DB请求的Api返回数据的结果,得到并发查询结果;图:请求自身服务方式流程示意通过并发RPC方式,封装自身查库接口,查询示例如下:{for($i=1;$i'func'.$i,'serviceObj'=>$myselfObj,'uri'=>'api/v1/test/get_data_by_myself',//该接口为查询表中数据的接口.'query'=>['where'=>[//'min'=>($rand-1)*5000,//'max'=>$rand*5000,'id'=>'1',],'table'=>$tableName],'data'=>[],'httpMethod'=>BaseService::REQUEST_METHOD_GET,];}$tsRet=$myselfObj->queryWithts($requestList);return$this->success($tsRet);}使用同步线性的查询数据[郭1][MOU2]库,每个查询耗费时间在0.3-0.7秒之间,接口总的响应时间为:5.15秒,可知同步线性方式查询数据库耗时是一个累加的结果。具体响应时间如下图所示:图:使用同步线性调用查询数据库接口响应时长使用协程并发的方式查询数据库,每个查询耗费时间在0.3-0.7秒之间,但是接口总响应时间在1.5秒以内,说明并发查询数据库,耗时并不是一个累加的结果,且耗时优于同步线性查询数据库。图:使用协程并发调用查询数据库接口响应时长优劣分析优势:通过并发调用Api的方式,这样在链路上完整的记录了trace信息,不脱离当前的监控运维体系。劣势:在耗时上增加了请求自身的IO和加载框架时间开销的延时,同时会对服务器IO数和CPU占用增加。比如,需要查询3次数据库,则增加了3次请求自身的IO,和3次加载框架的CPU消耗。3.3.3 小结这两种方式都可实现对mysql的并发访问,以上说明了两种方式的原理和各自的利弊。在使用的时候可以依据场景来选择方案:Mysql异步并发访问:比较适合Mysql负载不高,单个SQL请求耗时较短,但是请求较多的场景。请求自身服务方式:比较比较适用于,单个SQL请求耗时较长,服务器IO和CPU存量较为宽裕的情况。3.4 Task调度以上在使用并发去做RPC或MySQL查询的时候,我们假定任务是独立的,没有上下游依赖关系的。如果一旦有上下游依赖关系,且是一个网状的关系,我们如何整理哪些RPC或MySQL查询是可以在一次并发中完成的呢?3.4.1 设计思路可以将任务之间的依赖抽象成有向图,首先判断是否存在环状依赖,如果有环状依赖这个是不可解的。其次:此时这个图成了一个DAG(有向无环图),使用拓扑排序的方式,逐步解决无依赖的任务和找出相互独立的任务,如此下去;最后,在每一步中,相互独立的任务即可在一次并发请求中完成。3.4.2 具体解决方案图:Task依赖关系图以上图为例,Task1到Task5,箭头表示被依赖的Task指向依赖的Task。上图中为Task2依赖Task1。Task1的入度为0。入度是图论算法中重要的概念之一。它通常指有向图中某点作为图中边的终点的次数之和。判断是否存在环。这种一般是两种办法,深度遍历和拓扑排序。深度遍历:可以把图抽象为邻接表存储,在做深度遍历的时候记录访问的节点,如果有节点被第二次访问到,说明存在环。可以递归的方式来方位,传递一个hash结构,key为节点的id,value标识节点的计数,如果key已存在或value的值大于1,则输出有环。此种方法的复杂度, 由于采用邻接表存储,则只存储了边结点(e条边),加上表头结点为n(也就是顶点个数),因此时间复杂度为O(n+e)。拓扑排序:方法是重复寻找一个入度为0的顶点,将该顶点从图中删除(即放进一个队列里存着,这个队列的顺序就是最后的拓扑排序,具体见程序),并将该结点及其所有的出边从图中删除(即该结点指向的结点的入度减1),最终若图中全为入度为1的点,则这些点至少组成一个回路。图:拓扑排序删除入度为0的顶点对于邻接表,遍历所有边,求各顶点入度的时间复杂度是O(e),即边的个数。遍历所有结点,找出入度为0的结点的时间复杂度是O(n),即顶点的个数。遍历所有边,删除入度为0的结点的出边的复杂度为O(e),即边的个数。所以总的时间复杂度是O(n+e)。具体抽象和代码,这里就不做赘述,在leetcode上有很多这种问题与解法:https://leetcode-cn.com/problems/course-schedule/ 。4、总结本文首先从Task Scheduler 协程模型入手,解释这个模型原理;随后,介绍如何使用该模型实现并发调用的RPC以及给出了压测的性能比对;接着,给出并发调用mysql的实现方案和性能分析;最后,补充了多任务之间有依赖如何判断是否存在环和编排顺序。在有了这个协程模型后可以扩展出更多的协程处理方式,主要还是解决IO密集型的问题,道理是一样的:让多个IO几乎同时发出,随后收集多个结果。如此可以扩展出对redis等访问。如此,大大缩短了微服务对下游服务、DB、Redis等访问的时间,提升了请求的效率。目前在我们的项目中,有部分微服务组装数据时使用了以上封装的RPC并发模式,在部分cron脚本中使用了mysql并发模式模式。 预览时标签不可点 后端27php5后端 · 目录#后端上一篇PHP命名空间与类常量用法检查方案的探索与实践下一篇分布式事务框架 Seata 批量操作优化经验关闭更多小程序广告搜索「undefined」网络结果
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-4 05:56 , Processed in 0.980100 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表