本文总结:什么是GIL全局解释器锁,什么是死锁现象? 了解知识-什么是递归锁,信号量; python的多线程在什么场景下适用? 什么是Event事件,线程q; 进程池和线程池的实现; 什么是协程,以及如何使用gevent模块;最后介绍了IO模型的分类···👆
官网文档:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
文档剖析:
在Cpython中GIL全局解释器锁其实也是一把互斥锁,主要用于阻止同一个进程下的多个线程同时被运行(通俗理解:python的多线程无法使用多核优势);
GIL肯定存在于CPython解释器中 主要原因就在于Cpython解释器的内存管理不是线程安全的;
内存管理:垃圾回收机制
Cpython解释器自带的GIL解释器锁,线程要想执行代码去抢锁,抢python解释器,之后才回收,那么这样就能保证了阻止同一个进程下的多个线程同时被运行,不容易造成数据错乱;比如,抢票,如果你提交了订单,那么别人还能操作到你这张票的订单吗?不会了吧;这样就进而使数据不容易错乱;
name='hz'
,还没有来得急绑定关系,垃圾回收机制就可能给你回收了,因为垃圾回收也是线程,想要执行也得拿解释器来执行,但是不是和你的代码串行;验证之前需要明白什么是多道技术
(切换+保存状态)
多道技术什么时候切换:程序执行时间长、程序有IO操作(√,示例利用IO)
from threading import Thread
import time
m = 100
def test():
global m
tmp = m
# time.sleep(1)
# IO操作,造成数据错乱,sleep(1)运行释放了GIL锁,100个线程同样的操作反复执行,导致结果为99,如果没有IO操作结果为0
tmp -= 1
m = tmp
for i in range(100):
t = Thread(target=test)
t.start()
time.sleep(3)
print(m) # 0
'''
同一个进程下的多个线程虽然有GIL的存在不会出现并行的效果,但是如果线程内有IO操作还是会造成数据的错乱,这个时候需要我们额外的添加互斥锁(就不止GIL一把锁了)
'''
补:抢锁释放锁简写方式
a = Lock()
# 方式一:
a.acquire()
'''代码体'''
a.release()
# 方式二:
with a:
'''代码体'''
# 适用with上下文管理器,会自动抢锁释放锁
from threading import Thread,Lock
import time
mutex = Lock()
m = 100
def test():
global m
with mutex:
tmp = m
time.sleep(0.1)
# IO操作,造成数据错乱,sleep(1)运行释放了GIL锁,100个线程同样的操作反复执行,导致结果为99,如果没有IO操作结果为0
tmp -= 1
m = tmp
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=test)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(m) # 0
存在多把锁的情况,会出现死锁现象;
from threading import Thread, Lock
import time
A = Lock()
B = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
A.acquire() # 抢锁
print('%s 抢到了A锁' % self.name) # 相当于获取线程名称
# current_thread().name 获取线程名称
B.acquire()
print('%s 抢到了B锁' % self.name)
time.sleep(1)
B.release() # 释放锁
print('%s 释放了B锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
def func2(self):
B.acquire()
print('%s 抢到了B锁' % self.name)
A.acquire()
print('%s 抢到了A锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
B.release()
print('%s 释放了B锁' % self.name)
for i in range(10):
obj = MyThread()
obj.start()
1、线程1抢A锁,B锁,其他线程等待;
2、线程1释放了B锁,其他线程等待,因为A锁没有释放;线程1释放了A锁,其他线程才能去func1中抢锁;
3、线程1去func2中抢B锁,A锁,其他线程抢func1中的A锁和B锁,现在锁还在线程1手中,那么就可能导致卡死现象;
4、通俗理解,这样就导致了,你要的在我手上,我要的在你手上,比如A和B,A在B家被锁了,B在A家被锁了,A和B拿的自己家的钥匙,但是他们在对方的家中,那么就死锁了;
递归锁特点:可以被连续的acquire和release,但是只能被第一个抢到这把锁执行上述操作,它的内部有一个计数器,每acquire一次计数加一,每release一次计数减一,只要计数不为0,其他人都无法抢锁;
模块:RLock
Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
acquired it.
from threading import Thread,Lock,RLock
import time
'''
mutexA = Lock()
mutexB = Lock()
相当于开启了两把锁;
'''
# 两把锁指向同一个内存空间地址,开启一把锁
A = B = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
A.acquire() # 抢锁
print('%s 抢到了A锁' % self.name) # 相当于获取线程名称
# current_thread().name 获取线程名称
B.acquire()
print('%s 抢到了B锁' % self.name)
time.sleep(1)
B.release() # 释放锁
print('%s 释放了B锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
def func2(self):
B.acquire()
print('%s 抢到了B锁' % self.name)
A.acquire()
print('%s 抢到了A锁' % self.name)
A.release()
print('%s 释放了A锁' % self.name)
B.release()
print('%s 释放了B锁' % self.name)
if __name__ == '__main__':
for i in range(10):
obj = MyThread()
obj.start()
# 递归锁的适用不会死锁,抢一次锁计数加1,释放计数减1,其实就是一把锁没有导致混乱
信号量在不同的阶段可能对应不同的技术点;
在并发编程中信号量指的也是锁;
通俗理解:
互斥锁是一个厕所,那么信号量就是多个厕所
from threading import Thread,Semaphore
import time
import random
sm = Semaphore(5) # 括号内写几就代表几个坑位
def task(name):
sm.acquire() # 抢锁
print(f'{name} is running!')
# time.sleep(3)
time.sleep(random.randint(1,5))
sm.release() # 释放锁
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task,args=(f'拉屎{i}号',))
t.start()
# 信号量可以理解为拉屎的坑位,三个人抢锁(进入厕所),拉完了就是释放锁了;
如果面试官问你python多线程是不是没用啊?你是不是得分情况回答他,不能直接说有用啊,存在即合理···😂
视情况而定来判断是否需要多线程,看程序的类型;
程序的类型
IO密集型(常用):需要用户给定指令或者反馈来执行,比如博客页面,你动网页它就动,不需要实时更进更新等;
计算密集型:需要长时间的计算,比如自动驾驶,是不是需要长时间计算路况,或者匹配路线等;
单核多线程牛逼,多核情况下,计算密集型开多进程;IO密集型开多线程;
可以开多进程结合多线程,哈哈哈要啥有啥😁
比如有四个任务,每个任务耗时10s;
开设多进程没有太大的优势,共10s+
因为遇到IO操作cpu就不再服务,就需要切换,并且开设进程还需要申请内存空间和拷贝代码;
开设多线程有优势,10s+
不需要消耗额外的资源,只需要一个CPU(单核情况下IO密集型开多线程是完全有优势的,以耗时任务最长的为准!)
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为8核
start=time.time()
for i in range(400):
# 开进程
# p=Process(target=work) # run time is 10.905012845993042大部分时间耗费在创建进程上
# 开线程
p=Thread(target=work) # run time is 2.061677932739258
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
# IO密集型的时候,同样的任务多线程只需要2秒
计算密集型(占着cpu不放),比如有四个任务,每个任务耗时10s;
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count()) # 本机为8核
start=time.time()
for i in range(6):
p=Process(target=work) # 多进程耗时:run time is 6.857659339904785
# p=Thread(target=work) # 多线程耗时:run time is 24.021770000457764
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
# 计算密集型,多进程只需6s,这样只需看一个cpu计算任务所需的时间,那么多个cpu同时结束;
# 多线程就得排队来了,执行完一个继续下一个····
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行;
通俗理解:比如汽车生成车间,肯定得先冲压零部件,然后焊接这些部件(不能扯一体式冲压昂😓),然后涂装工艺,最后总装···
那么把这些一个个流程比作线程或进程,是不是得前面的进行完才能往下走?
案例:
from threading import Thread,Event
import time
event = Event() # 造红绿灯
def light():
print('红灯停')
time.sleep(3)
print('绿灯行')
# 行人走
event.set()
def people(name):
print(f'{name} 等红绿灯')
event.wait()
print(f'{name} 可以走了')
if __name__ == '__main__':
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=people,args=(f'{i}',))
t.start()
同一个进程下的多个线程数据是共享的,为什么同一个进程下还会去使用队列呢?因为队列是管道和锁构成的,使用队列也是为了保证数据得到安全(适用场景自定义)
import queue
# 先进先出的队列
# q = queue.Queue(3)
# q.put(1)
# q.get()
# q.get_nowait()
# q.full()
# q.empty()
# 后进先出队列,其实和堆栈大差不差
# q = queue.LifoQueue(3)
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get()) # 3
# 优先级队列:可以给放入队列中的数据设置进出的优先级
q = queue.PriorityQueue(4)
q.put((10,'111'))
q.put((100,'222'))
q.put((0,'333'))
q.put((-5,'444'))
print(q.get()) # (-5, '444')
# 数字越小,优先级越高,put((优先级,数据))
TCP服务端并发
# 面条版主体
import socket
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
while True:
sock,addr = server.accept()
# 通信循环
while True:
try:
data = sock.recv(1024)
if len(data) == 0:break
sock.send(data.upper()) # 发送大写
except ConnectionResetError as e:
print(e)
break
sock.close()
封装版
from threading import Thread
import socket
# 通信函数
def communication(sock):
# 通信循环
while True:
try:
data = sock.recv(1024)
if len(data) == 0: break
sock.send(data.upper()) # 发送大写
except ConnectionResetError as e:
print(e)
break
sock.close()
# 连接客户端函数
def server(ip,port):
server = socket.socket()
server.bind((ip,port))
server.listen(5)
while True:
sock,addr = server.accept()
# 开设多进程/多线程
t = Thread(target=communication,args=(sock,))
t.start()
if __name__ == '__main__':
s = Thread(target=server,args=('127.0.0.1',8080))
s.start()
思考:能否无限制的开设进程或者线程???
肯定是不能无限制开设的,如果单从技术层面上来说无限开设肯定是可以的并且是最高效,但是从硬件层面上来说是无法实现的(硬件的发展永远赶不上软件的发展速度)
这时候就出现了池,我们在合理适用计算机的时候,保证硬件正常工作的前提,去开设多进程和多线程,是最合理的,如果硬件崩溃了,软件也没用了;
模块:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
# 线程池:固定开设5个线程,5个线程不会重复出现重复创建和销毁(节省资源);
pool = ThreadPoolExecutor(5) # 括号内可以传数字,不传默认开设当前计算机cpu个数五倍的线程
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
def task(n):
print(n)
time.sleep(2)
return f'返回结果:{n*2}'
# pool.submit(task,1) # 朝池子中提交任务(异步)
'''
提交方式:同步、异步
'''
# print('main')
# 朝池子提交20个任务,每次只能接5个
t_list = []
for i in range(20):
res = pool.submit(task,i)
# print(res.result())
# 返回结果,异步变串行,同步提交
t_list.append(res)
pool.shutdown() # 关闭线程池,等待线程池中所有任务运行完毕
# 解决了等待卡顿
for t in t_list: # 异步提交结果,先起任务再返回结果
print('>>>>',t.result())
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
import os
# 进程池:固定开设几个进程,进程不会重复出现重复创建和销毁(节省资源);
pool = ProcessPoolExecutor(5) # 括号内可以传数字,不传默认开设当前计算机cpu个数
def task(n):
print(n,os.getpid())
time.sleep(2)
return f'进程号:{os.getpid()}'
# 异步提交任务的返回结果,应该通过回调机制来获取,而不是下面的for循环最后获取
def call_back(n): # n 返回的对象
print(n.result()) # n.result 相当于res.result
if __name__ == '__main__':
# 朝池子提交20个任务,每次只能接5个
# t_list = []
for i in range(20):
res = pool.submit(task,i).add_done_callback(call_back) # 回调机制
# print(res.result())
# 返回结果,异步变串行,同步提交
# t_list.append(res)
# pool.shutdown()
# # 解决了等待卡顿
# for t in t_list: # 异步提交结果,先起任务再返回结果
# print('>>>>',t.result())
主要操作:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
res = pool.submit(task,i).add_done_callback(call_back) # 回调机制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
# 创建进程池与线程池
# pool = ThreadPoolExecutor(5) # 可以自定义线程数 也可以采用默认策略
pool = ProcessPoolExecutor(5) # 可以自定义线程数 也可以采用默认策略
# 定义一个任务
def task(n):
print(n, os.getpid())
time.sleep(2)
return '>>>:%s' % n ** 2
# 定义一个回调函数:异步提交完之后有结果自动调用该函数
def call_back(a):
print('异步回调函数:%s' % a.result())
# 朝线程池中提交任务
# obj_list = []
for i in range(20):
res = pool.submit(task, i).add_done_callback(call_back) # 异步提交
# obj_list.append(res)
"""
同步:提交完任务之后原地等待任务的返回结果 期间不做任何事
异步:提交完任务之后不愿地等待任务的返回结果 结果由异步回调机制自动反馈
"""
# 等待线程池中所有的任务执行完毕之后 再获取各自任务的结果
# pool.shutdown()
# for i in obj_list:
# print(i.result()) # 获取任务的执行结果 同步
在windows电脑中如果是进程池的使用也需要在__main__下面
进程:资源单位
线程:工作单位
协程:程序员自定义的名词,意思是单线程下实现并发(程序员自己在代码层面上监测我们所有的IO操作,一但遇到IO,我们在代码级别完成切换,这样给cpu的感觉是程序一直在运行,没有IO操作从而提升效率)
多道技术:切换+保存技术
CPU被剥夺的条件:
程序长时间占用
程序进入IO操作
并发去实现切换+保存状态
欺骗CPU的行为:
单线程下我们如果能够自己检测IO操作并且自己实现代码层面的切换
那么对于CPU而言我们这个程序就没有IO操作,CPU会尽可能的被占用
注意切换不一定能提升效率,如果是IO密集型就会提升效率,计算密集型切换就会降低效率;
能够自主监测IO行为并切换
from gevent import monkey;monkey.patch_all()
# 固定代码格式加上之后才能检测所有的IO行为
from gevent import spawn
import time
def play(name):
print('%s play 1' % name)
time.sleep(5)
print('%s play 2' % name)
'''
两个方法有IO操作,代码一直在反复“跳”
'''
def eat(name):
print('%s eat 1' % name)
time.sleep(3)
print('%s eat 2' % name)
start = time.time()
# play('Hammer') # 正常的同步调用
# eat('Hammer') # 正常的同步调用 8s+
g1 = spawn(play, 'Hammer') # 异步提交
g2 = spawn(eat, 'Hammer') # 异步提交
g1.join()
g2.join() # 等待被监测的任务运行完毕
print('主', time.time() - start) # 单线程下实现并发,提升效率5s+
# 并发效果:一个服务端可以同时服务多个客户端
import socket
from gevent import monkey;monkey.patch_all()
from gevent import spawn
def talk(sock):
while True:
try:
data = sock.recv(1024)
if len(data) == 0:break
print(data)
sock.send(data+b'hi')
except ConnectionResetError as e:
print(e)
sock.close()
break
def servers():
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen()
while True:
sock, addr = server.accept()
spawn(talk,sock)
g1 = spawn(servers)
g1.join()
# 客户端开设几百个线程发消息即可
最牛的情况:多进程下开设多线程,多线程下开设协程
我们以后可能自己动手写的不多,一般都是使用别人封装好的模块或框架
IO模型研究的主要是网络IO(linux系统),理论为主,代码实现大部分为伪代码;
最为常见的一种IO模型,有两个等待的阶段(wait for data、copy data)
计算机1和计算机2数据传输,需要经过拷贝到内存到OSI,然后才到计算机2的OSI七层到内存;
系统调用阶段变为了非阻塞(轮询) 有一个等待的阶段(copy data),轮询的阶段是比较消耗资源的;
通俗的理解:会一直询问kernel有没有数据~,并不会阻塞操作,直到copy才会阻塞;
利用select或者epoll来监管多个程序 一旦某个程序需要的数据存在于内存中了 那么立刻通知该程序去取即可;
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程;通俗理解为:多个人排队取餐,监控select,如果参号了,kernel说好了,然后用户去取餐return;
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection;
只需要发起一次系统调用 之后无需频繁发送 有结果并准备好之后会通过异步回调机制反馈给调用者;
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.cnblogs.com/48xz/p/15838973.html
内容来源于网络,如有侵权,请联系作者删除!