多线程,多进程和携程(实战篇)

实战篇四

多进程

在windows下使用多进程,不管是ProcessPoolExecutor还是multiprocessing,代码要放到if __ name__ == '__ main __'模块中,否则会报错

from concurrent.futures import ProcessPoolExecutor, as_completed
def fib(b):
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)
if __name__ == '__main__':
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(fib,(num)) for num in range(15,45)]
        for future in as_completed(all_task):
            data = future.result()
            print('result: {}'.format(data))
复制代码
  • 在处理CPU密集型问题时,使用多进程会比使用多线程快,但是很难做到翻倍,因为进程的切换比线程的切换更耗时

multiprocessing

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    return n

if __name__ == '__main__':
    progress = multiprocessing.Process(target=get_html, args=(2,))
    progress.start()
    progress.join()
    print(progress.pid) #打印进程的ID
复制代码

multiprocessing 的进程池

pool = multiprocessing.Pool(multiprocessing.cpu_count())
result = pool.apply_async(get_html, args=(3,))#进程的提交
pool.close() #在调用.join之前要先调用.close,关闭进程池使其不再接受进程,否则.join方法会出错
pool.join()
print(result.get())
复制代码
  • 常用方法:

    • apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)apply()方法的异步版本

    • apply(self, fuc, args=(), kwds={}):添加进程

    • imap(self, func, iterable, chunksize=1):对应到之前executor里的map方法

      pool = multiprocessing.Pool(multiprocessing.cpu_count())
      for result in pool.imap(get_html, [1,5,3]):
          print('{} sleep success'.format(result))
      >>
      1 sleep success
      5 sleep success
      3 sleep success
      复制代码
    • imap_unordered(self, func, iterable, chunksize=1):先执行完的先返回

      for result in pool.imap_unordered(get_html, [1,5,3]):
          print('{} sleep success'.format(result))
      >>
      1 sleep success
      3 sleep success
      5 sleep success
      复制代码

进程之间的通信

  • 特别注意:多进程中的Queue与多线程中的不一样,这里要使用来自maltiprocessing里的Queue

    from multiprocessing import Process,Queue
    import multiprocessing
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
    
    if __name__ == "__main__":
        queue = Queue(10)
        my_producer = Process(target=producer, args=(queue,))
        my_consumer = Process(target=consumer, args=(queue,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    复制代码
  • 特别注意maltiprocessing里创建的Queue不能用在Pool创建的进程池中

    from multiprocessing import Pool,Queue
    if __name__ == "__main__":
        queue = Queue(10)
        pool = Pool(2)
    
        pool.apply_async(producer, args=(queue,))
        pool.apply_async(consumer, args=(queue,))
    
        pool.close()
        pool.join()
    #没有任何结果返回
    复制代码
  • 特别注意:进程池中进程的通信需要用来自Manager里的Queue

    from maltiprocessing import Manager
    if __name__ == "__main__":
        queue = Manager().Queue(10)
        pool = Pool(2)
    
        pool.apply_async(producer, args=(queue,))
        pool.apply_async(consumer, args=(queue,))
    
        pool.close()
        pool.join()
    复制代码
  • 使用pipe进行通信

    • pipe只能适用于两个进程间的通信

    • pipe返回的是Connection的实例,Connection的常用方法有:send(self, obj)recv(self)

      def Pipe(duplex=True):
          return Connection(), Connection()
      
      class Connection(object):
          def send(self, obj):
              pass
      
          def recv(self):
              pass
      复制代码
    • pipe的性能比queue要高

      from maltiprocessing import Pipe
      def producer(pipe):
          pipe.send("bobby")
      
      def consumer(pipe):
          print(pipe.recv())
      
      if __name__ == "__main__":
          recevie_pipe, send_pipe=Pipe() #管道的两端,发送端和接收端
      
          my_producer= Process(target=producer, args=(send_pipe, ))
          my_consumer = Process(target=consumer, args=(recevie_pipe,))
      
          my_producer.start()
          my_consumer.start()
          my_producer.join()
          my_consumer.join()
      复制代码
  • 共享全局变量在多进程中是不适用的

    def producer(a):
        a += 100
        time.sleep(2)
    def consumer(a):
        time.sleep(2)
        print(a)
    if __name__ == '__main__':
        a = 1
        my_producer = Process(target=producer,args=(a,))
        my_consumer = Process(target=consumer,args=(a,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()
    >> 1 #返回值还是1,不是101
    复制代码
  • 进程间的同步(维护一个公共的内存/变量)使用Manager()

    def Manager():
        return multiprocessing.SyncManager()
    
    class SyncManager(multiprocessing.managers.BaseManager):
        def Condition(self, lock=None):
            return threading.Condition(lock)
    
        def Event(self):
            return threading.Event()
    
        def Lock(self):
            return threading.Lock()
    
        def Namespace(self):
            pass
    
        def Queue(self, maxsize=None):
            return queue.Queue()
    
        def RLock(self):
            return threading.RLock()
    
        def Semaphore(self, value=None):
            return threading.Semaphore(value)
    
        def Array(self, typecode, sequence):
            pass
    
        def Value(self, typecode, value):
            pass
    
        def dict(self, mapping_or_sequence):
            pass
    
        def list(self, sequence):
            pass
    复制代码
    def add_data(p_dict, key, value):
        p_dict[key] = value
        
    if __name__ == '__main__':
        progress_dict = Manager().dict()
        
        first = Process(target=add_data, args=(progress_dict, 'Bobby1',22))
        second = Process(target=add_data, args=(progress_dict, 'Bobby2',23))
        
        first.start()
        second.start()
        first.join()
        second.join()
        
        print(progress_dict)
    >> {'bobby1': 22, 'bobby2': 23}
    复制代码
    • 使用时注意数据的同步,必要时需要加锁
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享