绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
只需几行代码,即可实现多线程和多进程操作
2019-12-05 16:46:33


concurrent.futures是标准库里的一个模块,它提供了一个实现异步任务的 API 接口。本文将通过一些代码例子来介绍这个模块常见的用法。

Executors

Executor是一个抽象类,它有两个非常有用的子类--ThreadPoolExecutor和ProcessPoolExecutor。从命名就可以知道,前者采用的是多线程,而后者使用多进程。下面将分别介绍这两个子类,在给出的例子中,我们都会创建一个线程池或者进程池,然后将任务提交到这个池子,这个池子将会分配可用的资源(线程或者进程)来执行给定的任务。

ThreadPoolExecutor

首先,先看看代码:

fromconcurrent.futuresimportThreadPoolExecutorfromtimeimportsleep# 定义需要执行的任务--休眠5秒后返回传入的信息defreturn_after_5_secs(message):sleep(5)returnmessage# 建立一个线程池,大小为 3pool = ThreadPoolExecutor(3)future = pool.submit(return_after_5_secs, ("hello"))print(future.done())sleep(5)print(future.done())print(future.result())复制代码

输出结果:

FalseFalsehello复制代码

这个代码中首先创建了一个ThreadPoolExecutor对象--pool,通常这里默认线程数量是 5,但我们指定线程池的线程数量是 3。接着就是调用submit()方法来把需要执行的任务,也就是函数,以及需要传给这个函数的参数,然后会得到Future对象,这里调用其方法done()用于告诉我们是否执行完任务,是,就返回true,没有就返回false。

在上述例子中,次调用done()时候,并没有经过 5 秒,所以会得到false;之后进行休眠 5 秒后,任务就会完成,再次调用done()就会得到true的结果。如果是希望得到任务的结果,可以调用future的result方法。

对Future对象的理解有助于理解和实现异步编程,因此非常建议好好看看官方文档的介绍:

docs.python.org/3/library/c…

ProcessPoolExecutor

ProcessPoolExecutor也是有相似的接口,使用方法也是类似的,代码例子如下所示:

fromconcurrent.futuresimportProcessPoolExecutorfromtimeimportsleepdefreturn_after_5_secs(message):sleep(5)returnmessage pool = ProcessPoolExecutor(3) future = pool.submit(return_after_5_secs, ("hello"))print(future.done())sleep(5)print(future.done())print("Result: "+ future.result())复制代码

输出结果:

FalseFalseResult: hello复制代码

通常,我们会用多进程ProcessPoolExecutor来处理 CPU 密集型任务,多线程ThreadPoolExecutor则更适合处理网络密集型 或者 I/O 任务。

尽管这两个模块的接口相似,但ProcessPoolExecutor采用的是multiprocessing模块,并且不会被 GIL( Global Interpreter Lock) 所影响。不过对于这个模块,我们需要注意不能采用任何不能序列化的对象。

Executor.map()

上述两个模块都有一个共同的方法--map()。跟 Python 内建的map函数类似,该方法可以实现对提供的一个函数进行多次调用,并且通过给定一个可迭代的对象来将每个参数都逐一传给这个函数。另外,采用map()方法,提供的函数将是并发调用。

对于多进程,传入的可迭代对象将分成多块的数据,每块数据分配给每个进程。分块的数量可以通过调整参数chunk_size,默认是 1.

下面是官方文档给出的ThreadPoolExecutor的例子:

importconcurrent.futuresimporturllib.request URLS = ['http://www.baidu.com/','http://www.163.com/','http://www.126.com/','http://www.jianshu.com/','http://news.sohu.com/']# Retrieve a single page and report the url and contentsdefload_url(url, timeout):withurllib.request.urlopen(url, timeout=timeout)asconn:returnconn.read()# We can use a with statement to ensure threads are cleaned up promptlywithconcurrent.futures.ThreadPoolExecutor(max_workers=5)asexecutor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url,60): urlforurlinURLS}forfutureinconcurrent.futures.as_completed(future_to_url): url = future_to_url[future]try: data = future.result()exceptExceptionasexc: print('%r generated an exception: %s'% (url, exc))else: print('%r page is %d bytes'% (url, len(data)))复制代码

输出结果:

'http://www.baidu.com/'page is 153759 bytes'http://www.163.com/'page is 693614 bytes'http://news.sohu.com/'page is 175707 bytes'http://www.126.com/'page is 10521 bytes'http://www.jianshu.com/'generated an exception: HTTP Error 403: Forbidden复制代码

而对于ProcessPoolExecutor,代码如下所示:

importconcurrent.futuresimportmath PRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]defis_prime(n):ifn %2==0:returnFalsesqrt_n = int(math.floor(math.sqrt(n)))foriinrange(3, sqrt_n +1,2):ifn % i ==0:returnFalsereturnTruedefmain():withconcurrent.futures.ProcessPoolExecutor()asexecutor:fornumber, primeinzip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s'% (number, prime))if__name__ =='__main__': main()复制代码

输出结果:

112272535095293 is prime: True112582705942171 is prime: True112272535095293 is prime: True115280095190773 is prime: True115797848077099 is prime: True1099726899285419 is prime: False复制代码

as_completed() & wait()

concurrent.futures模块中有两个函数用于处理进过executors返回的futures,分别是as_completed()和wait()。

as_completed()函数会获取Future对象,并且随着任务开始处理而返回任务的结果,也就是需要执行的函数的返回结果。它和上述介绍的map()的主要区别是map()方法返回的结果是按照我们传入的可迭代对象中的顺序返回的。而as_completed()返回的结果顺序则是按照任务完成的顺序,哪个任务先完成,先返回结果。

下面给出一个例子:

fromconcurrent.futuresimportThreadPoolExecutor, wait, as_completedfromtimeimportsleepfromrandomimportrandintdefreturn_after_5_secs(num):sleep(randint(1,5))return"Return of {}".format(num) pool = ThreadPoolExecutor(5)futures = []forxinrange(5): futures.append(pool.submit(return_after_5_secs, x))forxinas_completed(futures): print(x.result())复制代码

输出结果

Return of 3Return of 4Return of 0Return of 2Return of 1复制代码

wait()函数返回一个包含两个集合的带有名字的 tuple,一个集合包含已经完成任务的结果(任务结果或者异常),另一个包含的就是还未执行完毕的任务。

同样,下面是一个例子:

fromconcurrent.futuresimportThreadPoolExecutor, wait, as_completedfromtimeimportsleepfromrandomimportrandintdefreturn_after_5_secs(num):sleep(randint(1,5))return"Return of {}".format(num) pool = ThreadPoolExecutor(5)futures = []forxinrange(5): futures.append(pool.submit(return_after_5_secs, x)) print(wait(futures))复制代码

输出结果:

DoneAndNotDoneFutures(done={, , , , }, not_done=set())复制代码

我们可以通过指定参数来控制wait()函数返回结果的时间,这个参数是return_when,可选数值有:FIRST_COMPLETED,FIRST_EXCEPTION和ALL_COMPLETED。默认结果是ALL_COMPLETED,也就是它会等待所有任务都执行完成才返回结果。

作者:spearhead_cai

链接:https://juejin.im/post/5d5ea3b3f265da03b1205302

来源:掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

猿,妙不可言
创建时间:2019-07-05 22:23:45
分享python,及前端一些开发心得、教程。 希望能和各位大佬交朋友~
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • 马国栋
    栈主

小栈成员

查看更多
  • ?
  • 栈栈
  • gamebus
  • 呵呵哒
戳我,来吐槽~