|
ProcessPoolExecutor是python中的作用:用来创建和管理进程池。 本文将详细介绍python中的ProcessPoolExecutor,包括它的工作原理、如何使用它、常见问题和最佳实践。Python进程简单介绍 python中,通常我们创建一个用进程,可以使用类Process,如图1,一个简单的例子,创建进程实例p1、p2的时候通过target和args传递函数名(func1)和func1的实参。进程p1、p2调用start方法开启进程。frommultiprocessingimportProcessimportosfromtimeimportsleepdeffunc1(name):print("当前进程ID:",os.getpid())print("父进程ID:",os.getppid())print(f"Process:{name}start")sleep(3)print(f"Process:{name}end")if__name__=="__main__":print("当前进程ID:",os.getpid())#创建进程p1=Process(target=func1,args=('p1',))p2=Process(target=func1,args=('p2',))p1.start()p2.start()图1  rocess实例方法还有join、kill、is_alive等方法,这些在这里不在赘述,大家自行翻阅资料查看。什么是进程池? 进程池是用于自动管理工作进程池的编程模式,负责固定数量的进程,控制何时创建它们,例如何时需要它们,控制它们不被使用时应该做什么,比如让它们在不消耗计算资源的情况下等待。 进程池提供了一个通用接口,用于执行具有可变参数数量的临时任务,与Process对象上的属性非常相似,但不需要我们选择一个进程来运行任务、启动进程或等待任务完成。  ython可以通过ProcessPoolExecutor类实现进程池。ProcessPoolExecutor简介  rocessPoolExecutor继承Executor 类,并在调用时返回Future对象。 Executor:ProcessPoolExecutor的父类,用于定义进程池的基本生命周期操作。 Future:将任务提交到进程池时返回的对象。类Executor Executor类定义了三种用于控制进程池的方法:submit()、map()和shutdown()。submit():分派一个要执行的函数并返回一个Future对象。map():将函数应用于可迭代的元素。shutdown():关闭执行器。 Executor在创建类时启动,并且必须通过调用shutdown()显式关闭,这将释放Executtor持有的所有资源,当然也可以自动关闭。 submit()和map()函数用于将任务提交给Executor进行异步执行。 map()函数用于将函数应用于可迭代对象(如列表)中的每个元素,该函数对应于元素的每个进程都将异步运行。 submit()函数接受一个函数以及对应的参数,并将异步执行,调用会立即返回Future对象。对象Futures 我们只用知道他是Executor和ProcessPoolExecutor返回的对象就行了,对象Futures的方法:cancelled():Returns True ifthetaskwascancelledbeforebeingexecuted.running():Returns True ifthetaskiscurrentlyrunning.done():Returns True ifthetaskhascompletedorwascancelled.result():Accesstheresultfromrunningthetask.exception():Accessanyexceptionraisedwhilerunningthetask.add_done_callback():Addacallbackfunctiontothetasktobeexecutedbytheprocesspooloncethetaskiscompleted.这些都先作为了解,后面结合例子,说说具体的作用。下面,正片开始!ProcessPoolExecutor的工作流程:第一步创建进程池: 我将进程池的容量设为6,容量大小是根据cpu核数来确定的,最好不要超过你电脑的核数。#创建进程池,并设置进程池容量executor=ProcessPoolExecutor(6)第二步往进程池里添加任务: 上一步创建的进程池有6个“空位置”,相当于我工厂里预留了6个生产线,但是这六个生产线还没接到任务,都在空闲状态。现在我们往进程池里添加任务。 比如,我现在有个函数my_task,这个函数执行时需要二个入参。defmy_task(a,b):sleep(2)returna+b'运行运行 一次提交一个任务:可以通过调用executor的submit方法,传入函数my_task和其入参来提交任务。#计算88加66future=executor.submit(my_task,88,66) 也可以一次提交多个任务:可以通过调用executor的map方法,传入两个等长的列表或者迭代器。#计算11加1、22加2、33加3、44加4、55加5futures=executor.map(my_task,[11,22,33,44,55],[1,2,3,4,5])第三步获取结果: 单个任务的话可以直接调用future的result()方法,result()可以加timeout参数,超过timeout指定的时间就会抛出异常。#88加66的结果154,result=154result=future.result()#result=future.result(timeout=5) 多个任务获取结果。futures=executor.map(my_task,[11,22,33,44,55],[1,2,3,4,5])#打印所有进程的运行结果foriinfutures:print(i)第四步关闭进程池:executor.shutdown()提交多次任务完整代码: 最好使用__name__=='__main__'的方法执行main()函数,不然会报RuntimeError错误。fromtimeimportsleepfromconcurrent.futuresimportProcessPoolExecutordefmy_task(a,b):sleep(2)returna+bdefmain():executor=ProcessPoolExecutor(4)futures=executor.map(my_task,[11,22,33,44,55],[1,2,3,4,5])#打印所有进程的运行结果foriinfutures:print(i)if__name__=='__main__':main()'运行运行ProcessPoolExecutor进阶方法:#提交多次任务的方法futures=[executor.submit(task,i)foriinrange(10)]#获取所有任务数量num=len(executor._pending_work_items)#获取某个任务的状态,'RUNNING','PENDING','FINISHED','CANCELLED'#分别表示正在运行,排队,结束,被取消执行state=future._state#取消进程,无法取消正在运行的进程futures.cancel()#如果任务已完成或被取消,则返回True。futures.done()
|
|