Python多线程
date
Aug 26, 2022
Property
slug
MultiProcess
status
Published
tags
SkillNote
summary
type
Post
Python线程安全
1、线程安全问题的出现
多个线程同时对一个变量进行修改,并且修改的是没有先后次序的,会导致出现问题:
import threading
num = 0
def add():
global num
for i in range(10000000):
num += 1
def sub():
global num
for i in range(10000000):
num -= 1
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
# 两个线程都是同时开始的,并对一个全局变量做了修改
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
2、同步锁
但是在加入这种锁之后,代码其实就是直接变成了串行运行了,并没有能很好的利用多线程加速运算这样这个代码就完全变成了串行的状态,对于这种计算密集型I/O业务来说,还不如直接使用串行化单线程执行来得快,所以这个例子仅作为一个示例,不能概述锁真正的用途。
- 同步锁;
- 互斥锁;
import threading
num = 0
lock = threading.Lock()
def add():
# 锁的加入
lock.acquire()
global num
for i in range(10000000):
num += 1
# 锁的释放
lock.release()
def sub():
lock.acquire()
global num
for i in range(10000000):
num -= 1
lock.release()
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
# 两个线程都是同时开始的,并对一个全局变量做了修改
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
2.1、死锁问题
对于同步锁而言,一次acquire就必须对应一次release,不可以说在多次acquire之后再依次的release的操作,这样会引起死锁导致程序的阻塞,完全动不了的问题。
import threading
num = 0
lock = threading.Lock()
def add():
lock.acquire()
lock.acquire()
global num
for i in range(10000000):
num += 1
lock.release()
lock.release()
def sub():
lock.acquire()
lock.acquire()
global num
for i in range(10000000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
# 两个线程都是同时开始的,并对一个全局变量做了修改
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
2.2、解决死锁问题,可以采用with的方法
这样就不用再去考虑其中的什么时候要上acquire,什么时候需要release的问题了,在lock中实现了__enter__()与__exit__()方法,所以可以直接采用with语句实现上下文加锁解锁的过程。
import threading
num = 0
lock = threading.Lock()
def add():
with lock:
global num
for i in range(10000000):
num += 1
def sub():
with lock:
global num
for i in range(10000000):
num -= 1
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
# 两个线程都是同时开始的,并对一个全局变量做了修改
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
3、递归锁
递归锁是同步锁的一个升级版本,在同步锁的基础上可以做到连续重复使用多次acquire()后再重复使用多次release()的操作,但是一定要注意加锁次数和解锁次数必须一致,否则也将引发死锁现象。
import threading
num = 0
lock = threading.RLock()
def add():
# 使用了递归锁,即使是多次的上锁也都是可以的
lock.acquire()
lock.acquire()
global num
for i in range(10000000):
num += 1
# 但是在release时,一定是要有acquire多少次,就需要release多少次
lock.release()
lock.release()
def sub():
lock.acquire()
lock.acquire()
global num
for i in range(10000000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
# 两个线程都是同时开始的,并对一个全局变量做了修改
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
3.1、使用递归锁也可以使用with的方法
import threading
# 设定递归锁
lock = threading.RLock()
num = 0
def Add():
with lock:
# 需要在里面设定为全局变量
global num
for i in range(10000000):
num += 1
def Sub():
with lock:
global num
for i in range(10000000):
num -= 1
if __name__ == "__main__":
sub_threading_01 = threading.Thread(target = Add)
sub_threading_02 = threading.Thread(target = Sub)
sub_threading_01.start()
sub_threading_02.start()
sub_threading_01.join()
sub_threading_02.join()
4、条件锁
条件锁在在递归锁的基础上增加了能够暂停线程运行的功能,并且我们可以使用wait()和notify()来控制线程执行的个数。
条件锁可以自由的设定一次放行多少个线程。
import threading
current_run_thread_number = 0
max_sub_thread_number = 10
condLock = threading.Condition()
def Task():
global current_run_thread_number
thread_name = threading.currentThread().name
# 先是获取锁,acquire
condLock.acquire()
print("start and wait run thread: %s" % thread_name)
# lock在等待
condLock.wait()
current_run_thread_number += 1
print("carry on the thread: %s" % thread_name)
# 释放这个锁
condLock.release()
if __name__ == "__main__":
for i in range(max_sub_thread_number):
sub_thread_ins = threading.Thread(target = Task)
sub_thread_ins.start()
while current_run_thread_number < max_sub_thread_number:
notify_number = int(input("please enter the number that you want to run: "))
# 开始上锁
condLock.acquire()
# 放行
condLock.notify(notify_number)
# 锁的释放
condLock.release()
print("main thread run end")
可以使用with的方法实现:
import threading
current_run_number = 0
max_sub_thread_number = 10
condLock = threading.Condition()
def Task():
global current_run_number
thread_name = threading.currentThread().name
# 只是解决了前面
with condLock:
print("start and wait run thread: %s" % thread_name)
condLock.wait()
current_run_number += 1
print("carry on run thread : %s" % thread_name)
if __name__ == "__main__":
for i in range(max_sub_thread_number):
sub_thread_ins = threading.Thread(target = Task)
sub_thread_ins.start()
while current_run_number < max_sub_thread_number:
notify_number = int(input("please enter the number that you want to run the thread"))
# 开启锁的通知
# 使用锁去开启
with condLock:
condLock.notify(notify_number)
print("main thread run end")
5、事件锁
事件锁是基于条件锁来做的,但是与条件锁的不同是,条件锁是能够一次放行一个,而事件锁是一次放行就全部通过,不能任意的放行任意的数量的子线程。
import threading
max_sub_thread_number = 3
def Task():
# 获取当前线程的名称
thread_name = threading.currentThread().name
print("start and waiting the thread name = ", thread_name)
eventLock.wait()
if __name__ == "__main__":
# 创建了一个事件锁
eventLock = threading.Event()
# 把线程给一个一个的插入进去
for i in range(max_sub_thread_number):
sub_thread_ins = threading.Thread(target = Task)
sub_thread_ins.start()
eventLock.set()
eventLock.clear()
eventLock.set()
6、信号锁
信号量锁也是根据条件锁来做的,它与条件锁和事件锁的区别如下:
- 条件锁:一次可以放行任意个处于“等待”状态的线程
- 事件锁:一次可以放行全部的处于“等待”状态的线程
- 信号量锁:通过规定,成批的放行特定个处于“上锁”状态的线程
我们可以直接将它理解为一个限制宽度的马路,每次只是放行相同数量的内容:
import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
semaLock.acquire()
print("run sub thread %s" % thName)
time.sleep(3)
semaLock.release()
if __name__ == "__main__":
# 每次只能放行2个
semaLock = threading.Semaphore(2)
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
还是可以使用with的方法:
import threading
import time
max_sub_thread_number = 6
semaLock = threading.Semaphore(2)
def Task():
thread_name = threading.currentThread().name
with semaLock:
print("run sub thread ", thread_name)
time.sleep(3)
if __name__ == "__main__":
for i in range(max_sub_thread_number):
sub_thread_ins = threading.Thread(target = Task)
sub_thread_ins.start()
7、条件锁的使用
# 只是相当于是说通知另一个线程开始,不会出现阻塞的问题,wait就是一个阻塞的意思
import threading
condLock = threading.Condition()
lst = []
def even():
# 加偶数
with condLock:
for i in range(2, 101, 2):
if len(lst) % 2 != 0:
# 添加偶数
lst.append(i)
condLock.notify()
condLock.wait()
else:
condLock.wait()
lst.append(i)
condLock.notify()
condLock.notify()
def odd():
# 加奇数
# 只是用作于通知而已
with condLock:
for i in range(1, 101, 2):
if len(lst) % 2 == 0:
lst.append(i)
condLock.notify()
condLock.wait()
condLock.notify()
if __name__ == "__main__":
addEvenTask = threading.Thread(target=even)
addOddTask = threading.Thread(target=odd)
addEvenTask.start()
addOddTask.start()
addEvenTask.join()
addOddTask.join()
print(lst)