实战篇三
concurrent.futures
- 
封装好的方法,为多线程和多进程提供了统一接口,方便调用。 
- 
优势: - 主线程中可以获取某一个线程的状态或者某一任务的状态,以及返回值
- 当一个线程完成的时候我们主线程能立即知道
 from concurrent.futures import ThreadPoolExecutor from concurrent.futures import Future def get_html(times): time.sleep(times) print('get page {} success'.format(times)) return times executor = ThreadPoolExecutor(max_workers=2) #最大线程数设定为2 task1 = executor.submit(get_html,(3)) #使用submit方法来提交线程 task2 = executor.submit(get_html,(2)) #executor返回的是一个Future对象 task1.done() #用于判定某个任务是否完成,该方法没有阻塞,是Future的方法 task2.cancel() #取消线程,只能在线程还没有开始前取消,若线程已经开始执行,则无法取消 task1.result() #返回线程执行的结果,有阻塞,只有在线程结束后才有返回值 复制代码
- 
常用方法: - 
as_completed(fs, timeout=None):- 生成器,返回已经完成的Future
- 返回结果不一定与参数的传入顺序相同,谁先完成就先返回谁
 #上例中若只获取已经成功了的task的结果 from concurrent.futures import as_completed urls = [3,4,2] all_task = [executor.submit(get_html, (url)) for url in urls] for future in as_completed(all_task): # 无阻塞的方法,一旦子线程完成执行,主线程中即可立马返回相应的结果 data = future.result() print("get {} page success".format(data)) 复制代码
- 生成器,返回已经完成的
- 
.map():- 类似于python中的map()函数,将可迭代的参数列表作用在同一函数上,其本质依旧是一个生成器,直接返回future.result()
 
- 类似于python中的
- 
返回的顺序与调用可迭代参数的顺序一致 #只获取已经成功了的task的结果的另一种方法 urls = [3,4,2] for future in executor.map(get_html, urls): print('get {} page success'.format(data)) 复制代码
- 
wait(fs, timeout=None, return_when=ALL_COMPLETED):阻塞主线程,直到指定的某些子线程完成后才能继续执行主线程(在return_when参数中设定)
- 
with ThreadPoolExecutor(n) as executor:使用with语句来进行调用
 
- 
Future
- 
submit(self, fn, *args, **kwargs):class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) def submit(self, fn, *args, **kwargs): with self._shutdown_lock: #添加锁,为了锁住后面的一段代码 if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() #生成一个Future对象,最后这个对象会被返回 w = _WorkItem(f, fn, args, kwargs) #WorkItem才是整个线程池的真正执行单元,Future对象会被传入WorkItem中 self._work_queue.put(w) #整个WorkItem实例被放入到_work_queue中 self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. num_threads = len(self._threads) if num_threads < self._max_workers: #判断启动了多少线程 thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) #线程执行的函数是_worker,而_worker中执行的函数来自self._work_queue #_worker函数的作用:运行线程,不停的从_work_queue中取出_WorkItem t.daemon = True t.start() self._threads.add(t) #将启动了的线程数量加入到_threads中 _threads_queues[t] = self._work_queue class _WorkItem(object): #设定这个类的目的其实就是执行函数,并将执行的结果设置到Future中 def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn #需要执行的函数在这里传入 self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as exc: self.future.set_exception(exc) # Break a reference cycle with the exception 'exc' self = None else: self.future.set_result(result) #将函数执行的结果设置到Future中 复制代码
- 
Future的常用方法:- 
cancel(self):取消futuredef cancel(self): """Cancel the future if possible. Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ with self._condition: #使用条件变量 if self._state in [RUNNING, FINISHED]: #判断future的状态使否是在执行或已完成,若是则无法取消 return False if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: return True self._state = CANCELLED self._condition.notify_all() self._invoke_callbacks() return True 复制代码
- 
cancelled(self):状态判断,是否已经取消
- 
running(self):状态判断,是否在执行
- 
done(self):状态判断,是否已完成或已取消
- 
result(self, timeout=None):- 返回调用的结果,是有阻塞的方法,依旧是使用condition来实现
 
- 
其中调用了condition的 wait()方法
- 
set_result(self, result):- 
set the return value of work associated with the future 
- 
调用了condition的 notify_all()方法
 
- 
 
- 
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
    























![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)
