实战篇四
多进程
在windows下使用多进程,不管是ProcessPoolExecutor
还是multiprocessing
,代码要放到if __ name__ == '__ main __'
模块中,否则会报错
from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(b):
if n <= 2:
return 1
return fib(n-1)+fib(n-2)
if __name__ == '__main__':
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(fib,(num)) for num in range(15,45)]
for future in as_completed(all_task):
data = future.result()
print('result: {}'.format(data))
复制代码
- 在处理CPU密集型问题时,使用多进程会比使用多线程快,但是很难做到翻倍,因为进程的切换比线程的切换更耗时。
multiprocessing
import time
import multiprocessing
def get_html(n):
time.sleep(n)
return n
if __name__ == '__main__':
progress = multiprocessing.Process(target=get_html, args=(2,))
progress.start()
progress.join()
print(progress.pid) #打印进程的ID
复制代码
multiprocessing 的进程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html, args=(3,))#进程的提交
pool.close() #在调用.join之前要先调用.close,关闭进程池使其不再接受进程,否则.join方法会出错
pool.join()
print(result.get())
复制代码
-
常用方法:
-
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)
:apply()
方法的异步版本 -
apply(self, fuc, args=(), kwds={})
:添加进程 -
imap(self, func, iterable, chunksize=1)
:对应到之前executor里的map
方法pool = multiprocessing.Pool(multiprocessing.cpu_count()) for result in pool.imap(get_html, [1,5,3]): print('{} sleep success'.format(result)) >> 1 sleep success 5 sleep success 3 sleep success 复制代码
-
imap_unordered(self, func, iterable, chunksize=1)
:先执行完的先返回for result in pool.imap_unordered(get_html, [1,5,3]): print('{} sleep success'.format(result)) >> 1 sleep success 3 sleep success 5 sleep success 复制代码
-
进程之间的通信
-
特别注意:多进程中的
Queue
与多线程中的不一样,这里要使用来自maltiprocessing
里的Queue
from multiprocessing import Process,Queue import multiprocessing def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == "__main__": queue = Queue(10) my_producer = Process(target=producer, args=(queue,)) my_consumer = Process(target=consumer, args=(queue,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() 复制代码
-
特别注意:
maltiprocessing
里创建的Queue
不能用在Pool
创建的进程池中from multiprocessing import Pool,Queue if __name__ == "__main__": queue = Queue(10) pool = Pool(2) pool.apply_async(producer, args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join() #没有任何结果返回 复制代码
-
特别注意:进程池中进程的通信需要用来自
Manager
里的Queue
from maltiprocessing import Manager if __name__ == "__main__": queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(producer, args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join() 复制代码
-
使用pipe进行通信
-
pipe只能适用于两个进程间的通信
-
pipe返回的是
Connection
的实例,Connection
的常用方法有:send(self, obj)
和recv(self)
def Pipe(duplex=True): return Connection(), Connection() class Connection(object): def send(self, obj): pass def recv(self): pass 复制代码
-
pipe的性能比queue要高
from maltiprocessing import Pipe def producer(pipe): pipe.send("bobby") def consumer(pipe): print(pipe.recv()) if __name__ == "__main__": recevie_pipe, send_pipe=Pipe() #管道的两端,发送端和接收端 my_producer= Process(target=producer, args=(send_pipe, )) my_consumer = Process(target=consumer, args=(recevie_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() 复制代码
-
-
共享全局变量在多进程中是不适用的
def producer(a): a += 100 time.sleep(2) def consumer(a): time.sleep(2) print(a) if __name__ == '__main__': a = 1 my_producer = Process(target=producer,args=(a,)) my_consumer = Process(target=consumer,args=(a,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() >> 1 #返回值还是1,不是101 复制代码
-
进程间的同步(维护一个公共的内存/变量)使用
Manager()
def Manager(): return multiprocessing.SyncManager() class SyncManager(multiprocessing.managers.BaseManager): def Condition(self, lock=None): return threading.Condition(lock) def Event(self): return threading.Event() def Lock(self): return threading.Lock() def Namespace(self): pass def Queue(self, maxsize=None): return queue.Queue() def RLock(self): return threading.RLock() def Semaphore(self, value=None): return threading.Semaphore(value) def Array(self, typecode, sequence): pass def Value(self, typecode, value): pass def dict(self, mapping_or_sequence): pass def list(self, sequence): pass 复制代码
def add_data(p_dict, key, value): p_dict[key] = value if __name__ == '__main__': progress_dict = Manager().dict() first = Process(target=add_data, args=(progress_dict, 'Bobby1',22)) second = Process(target=add_data, args=(progress_dict, 'Bobby2',23)) first.start() second.start() first.join() second.join() print(progress_dict) >> {'bobby1': 22, 'bobby2': 23} 复制代码
- 使用时注意数据的同步,必要时需要加锁
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END