这是我参与更文挑战的第13天,活动详情查看: 更文挑战
微信公众号搜索【程序媛小庄】,关注半路出家的程序媛如何靠python开发养家糊口~
前言
通过开启多线程或者多进程可以并发处理多个任务,但是如果无限制的开启多进程或者多线程会导致内存溢出,解决方案就是本文将要介绍的池
的概念。
进程池 & 线程池
之前TCP服务端的并发效果是通过开启多线程实现的,即每有一个客户端请求连接服务端,服务端就开设一个进程或者线程去处理,代码实现方式如下:
服务端代码:
# server.py
import socket
from threading import Thread
def communicate(conn):
while True:
try:
data = recv(1024)
if noe data:break
conn.send(data.upper())
except Exception:
break
conn.close
def sever(ip,port):
sever = socket.socket()
sever.bind((ip,port))
sever.listen(5)
while True:
conn,addr = accept()
# 开设线程为客户端服务
t = Thread(target=communicate,args=(conn,))
t.start()
if __name__ == '__main__':
s = Thread(target=sever,args=('127.0.01.1',8080))
s.start()
复制代码
客户端代码:
# client.py
import socket
client = socket.socket()
client.connect(('127.0.0.1',8080))
while True:
msg = input('》》').strip()
if len(msg) == 0:
continue
if msg == 'q':
break
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
复制代码
上述代码服务端通过开启多线程实现了服务端的并发效果,但是每有一个客户端请求就开一个线程显然是很浪费资源的,这就比如开了饭馆,来一个人就招聘一个服务员,来1000个人,就需要招聘1000个服务员,饭馆迟早会被挤爆的,同理在程序上也是一样的,无论是开设进程还是线程,都需要消耗资源,只不过是开设线程的消耗比假设进程的消耗小一些,作为程序的开发者,不可能做到无限制的开设进程和线程,因为计算机硬件资源跟不上,而硬件的开发速度远远赶不上软件,开发的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它。因此引入了池的概念。
池的概念
池
是用来保证计算机硬件安全的情况下最大限度的利用计算机,它降低了程序的运行效率但是保证了计算机硬件的安全
。池子造出来之后就开设了固定数量的线程或者进程,不会再变更,就好像饭馆中的服务员,不管有多少人来吃饭,固定就是这几个服务员。在池子中的进程或者线程不会重复的创建和销毁,极大的节省了计算机资源。
任务的提交方式有同步和异步两种,异步提交任务的返回结果应当通过回调
机制来获取,回调机制
就相当于给每个服务员一个任务,任务完成后,服务员会主动找老板汇报。
下面来介绍一下如何使用进程池和线程池来开启进程和线程。
current模块
不管是创建进程池还是线程池,都是通过current
模块实现的,只是进程池和线程池使用的类不同,其他使用方式一模一样,如下代码是线程池的基本使用,进程池的使用不做过多介绍,和线程池的使用相同,只是导入的类不同,进程池使用的类是ProcessPoolExecutor
。
from concurrent.futures import ThreadPoolExecutor
import os
import time
# 创建线程池对象
pool = ThreadPoolExecutor(5)
'''
括号内可以传数字,不传的话默认会开设当前计算机CPU个数的5倍的线程数。
池子造出来后,会固定存在括号内数字的线程个数,这些线程不会出现重复创建和销毁的现象
池子的使用:
只需将需要执行的任务(客人)往池子中提交即可,会自动有线程(服务员)执行该任务
'''
def task(n):
print(n,os.getpid())
time.sleep(2)
'''
pool.submit(task,1)
print('主')
向线程池中提交任务,是异步提交任务,主线程不会等子线程执行完毕之后再执行
pool.submit()会返回一个Future对象
'''
# 需求:等待线程池中的所有任务执行完毕之后才继续往下运行---将异步变为同步
res_list = []
for i in range(20):
res = pool.submit(task,i)
res_list.append(res)
# 关闭线程池,等待线程池中所有任务运行完毕
pool.shutdown()
# 拿到res的结果,res是任务(task)的返回值,也就是异步提交任务的返回结果
for res in res_list:
print(res.result()) # result方法:将异步变为同步
'''
程序由并发变为串行,即结果不是返回值与输出值交叉出现,而是先输出任务的打印内容,再输出任务的返回值
'''
# 异步回调机制拿到返回值的方式之高级版
def call_back(n):
print('call_back', n.result())
if __name__ == '__main__':
for i in range(20):
res = pool.submit(task,i).add_done_callback(call_back)
复制代码
总结以上代码,其实只需要掌握几点就可以了:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5) # 设置池子的大小
pool.submit(task, i).add_done_callback(call_back) # 异步提交任务,回调处理收集结果
复制代码
协程
通过之前的文章,我们知道进程是一种资源的单位,而线程是CPU真正的执行单位,协程这个概念是开发人员自己提出的一种概念,具体就是在单线程下实现并发
,需要在程序中控制多个任务的切换+保存状态,但是单线程下多个任务一旦有一个IO操作阻塞而没有切换任务,那么就会导致线程被阻塞在原地,该线程内的其他任务也无法执行。
单线程下不可避免的程序中大概率都会出现IO操作,但是果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务,在一个任务遇到IO阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被CPU执行的状态,相当于开发人员自己在代码层面上检测IO操作,一旦程序遇到IO操作,就会在代码级别完成切换。 这样给CPU的感觉就是程序一直在运行,没有IO操作,从而提升程序的运行效率 。
在代码层面上检测IO操作并切换任务并保存状态需要借助gevent模块
,该模块本身无法检测常见的一些IO操作,该模块是第三方模块,需要进行安装pip install gevnet
,另外在使用该模块时还需要额外导入两句代码:
from gevent import monkey
monkey.patch_all()
复制代码
由于上面的两行代码在使用gevent模块
时一定需要被导入,因此上述代码也支持简写:
from gevent import monkey;monkey.patch_all()
复制代码
可以通过下述例子具体看一下gevent模块
的具体使用:
from gevent import monkey; monkey.patch_all()
import time
from gevent import spawn # spawn进行任务切换
def heng():
print('哼')
time.sleep(2)
print('。heng 。')
def ha():
print('ha')
time.sleep(3)
print('。ha 。')
start_time = time.time()
g1 = spawn(heng)
g2 = spawn(ha)
g1.join() # 等待被检测的任务执行完毕 再往后继续执行
g2.join()
print(time.time()-start_time)
# 运行结果
哼
ha
。heng 。
。ha 。
3.0066308975219727 # 是任务用时最久的而不是任务用时总和。
复制代码
同样的,TCP服务端的并发也可以通过协程实现,代码如下:
服务端代码:
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:break
conn.send(data.upper())
except Exception:
break
conn.close()
def sever(ip,port):
sever = socket.socket()
sever.bind((ip,port))
sever.listen(5)
while True:
conn,addr = sever.accept()
spawn(communicate,conn)
if __name__ == '__main__':
g1 = spawn(sever,'127.0.0.1',8080)
g1.join()
复制代码
客户端代码:
import socket
from threading import Thread,current_thread
def client():
client = socket.socket()
client.connect(('127.0.0.1',8080))
while True:
n = 0
while True:
msg = '%s say hello %s' % (current_thread().name, n)
n += 1
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
if __name__ == '__main__':
for i in range(100):
t = Thread(target=client)
t.start()
复制代码
结语
码字不易,转载请说明出处,走过路过的小伙伴们伸出可爱的小指头点个赞再走吧(╹▽╹)