程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

5 種 Python 線程鎖

編輯:Python

這裡寫目錄標題

  • 線程安全
  • 鎖的作用
  • 1、Lock() 同步鎖
    • 死鎖現象
  • 2、RLock() 遞歸鎖
  • 3、Condition() 條件鎖
  • 4、Event() 事件鎖
  • 5、Semaphore() 信號量鎖
  • 鎖關系淺析
  • 基本練習題
    • 條件鎖的應用
    • 事件鎖的應用

線程安全

線程安全是多線程或多進程編程中的一個概念,在擁有共享數據的多條線程並行執行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執行,不會出現數據污染等意外情況。

線程安全的問題最主要還是由線程切換導致的,比如一個房間(進程)中有10顆糖(資源),除此之外還有3個小人(1個主線程、2個子線程),當小人A吃了3顆糖後被系統強制進行休息時他認為還剩下7顆糖,而當小人B工作後又吃掉了3顆糖,那麼當小人A重新上崗時會認為糖還剩下7顆,但是實際上只有4顆了。

上述例子中線程A和線程B的數據不同步,這就是線程安全問題,它可能導致非常嚴重的意外情況發生,我們按下面這個示例來進行說明。

下面有一個數值num初始值為0,我們開啟2條線程:

線程1對num進行一千萬次+1的操作

線程2對num進行一千萬次-1的操作

結果可能會令人咋舌,num最後並不是我們所想象的結果0:

import threading
num = 0
def add():
global num
for i in range(10_000_000):
num += 1
def sub():
global num
for i in range(10_000_000):
num -= 1
''' add()和sub()兩個線程同時對num進行操作 看結果,可以有以下推論: 1、同時操作不會是程序崩潰(num是int類型,int類型本身是線程安全的) 2、但是會操作結果呈現未知狀態,說明這個流程是線程不安全的。因為線程運行的先後順序是未知的。 '''
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# 結果三次采集
# num result : 669214
# num result : -1849179
# num result : -525674

上面這就是一個非常好的案例,想要解決這個問題就必須通過來保障線程切換的時機。

需要我們值得留意的是,在Python基本數據類型中list、tuple、dict本身就是屬於線程安全的,所以如果有多個線程對這3種容器做操作時,我們不必考慮線程安全問題。(非常好的設計)

鎖的作用

鎖是Python提供給我們能夠自行操控線程切換的一種手段,使用鎖可以讓線程的切換變的有序。

一旦線程的切換變的有序後,各個線程之間對數據的訪問、修改就變的可控,所以若要保證線程安全,就必須使用鎖。

threading模塊中提供了5種最常見的鎖,下面是按照功能進行劃分:

  • 同步鎖:lock(一次只能放行一個)

  • 遞歸鎖:rlock(一次只能放行一個)

  • 條件鎖:condition(一次可以放行任意個)

  • 事件鎖:event(一次全部放行)

  • 信號量鎖:semaphore(一次可以放行特定個)

1、Lock() 同步鎖

基本介紹
Lock鎖的稱呼有很多,如:同步鎖、互斥鎖

它們是什麼意思呢?如下所示:

互斥指的是某一資源同一時刻僅能有一個訪問者對其進行訪問,具有唯一性和排他性,但是互斥無法限制訪問者對資源的訪問順序,即訪問是無序的

同步是指在互斥的基礎上(大多數情況),通過其他機制實現訪問者對資源的有序訪問

同步其實已經實現了互斥,是互斥的一種更為復雜的實現,因為它在互斥的基礎上實現了有序訪問的特點

下面是threading模塊與同步鎖提供的相關方法:

使用方式

同步鎖一次只能放行一個線程,一個被加鎖的線程在運行時不會將執行權交出去,只有當該線程被解鎖時才會將執行權通過系統調度交由其他線程。

如下所示,使用同步鎖解決最上面的問題:

import threading
num = 0
def add():
lock.acquire() # 同步鎖
global num
for i in range(10_000_000):
num += 1
lock.release()
def sub():
lock.acquire() # 同步鎖 一次只能一個
global num
for i in range(10_000_000):
num -= 1
lock.release()
if __name__ == "__main__":
lock = threading.Lock() # 創建同步鎖
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# 結果三次采集
# num result : 0
# num result : 0
# num result : 0

這樣這個代碼就完全變成了串行的狀態,對於這種計算密集型I/O業務來說,還不如直接使用串行化單線程執行來得快,所以這個例子僅作為一個示例,不能概述鎖真正的用途。

死鎖現象

對於同步鎖來說,一次acquire()必須對應一次release(),不能出現連續重復使用多次acquire()後再重復使用多次release()的操作,這樣會引起死鎖造成程序的阻塞,完全不動了,如下所示:

import threading
num = 0
def add():
lock.acquire() # 上鎖
lock.acquire() # 死鎖
# 不執行
global num
for i in range(10_000_000):
num += 1
lock.release()
lock.release()
def sub():
lock.acquire() # 上鎖
lock.acquire() # 死鎖
# 不執行
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.Lock() # 創建lock
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)

with語句 (自動加鎖、解鎖)

由於threading.Lock()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:

import threading
num = 0
def add():
with lock: # with語句lock的用法,自動加鎖、解鎖
# 自動加鎖
global num
for i in range(10_000_000):
num += 1
# 自動解鎖
def sub():
with lock:
# 自動加鎖
global num
for i in range(10_000_000):
num -= 1
# 自動解鎖
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# 結果三次采集
# num result : 0
# num result : 0
# num result : 0

2、RLock() 遞歸鎖

基本介紹

遞歸鎖是同步鎖的一個升級版本,在同步鎖的基礎上可以做到連續重復使用多次acquire()再重復使用多次release()的操作,但是一定要注意加鎖次數和解鎖次數必須一致,否則也將引發死鎖現象。

下面是threading模塊與遞歸鎖提供的相關方法:

使用方式

以下是遞歸鎖的簡單使用,下面這段操作如果使用同步鎖則會發生死鎖現象,但是遞歸鎖不會:

import threading
num = 0
def add():
lock.acquire() # 第一次上鎖
lock.acquire() # 第二次上鎖,這種寫法 遞歸鎖 能正常運行;同步鎖會發生死鎖;
global num
for i in range(10_000_000):
num += 1
lock.release() # 不理解 遞歸鎖存在的意義???
lock.release()
def sub():
lock.acquire()
lock.acquire()
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.RLock() # 創建 遞歸鎖 對象
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# 結果三次采集
# num result : 0
# num result : 0
# num result : 0

with語句
由於threading.RLock()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:

import threading
num = 0
def add():
with lock: # 遞歸鎖和同步鎖 都可以使用 with 語句,進行自動加鎖、解鎖
# 自動加鎖
global num
for i in range(10_000_000):
num += 1
# 自動解鎖
def sub():
with lock:
# 自動加鎖
global num
for i in range(10_000_000):
num -= 1
# 自動解鎖
if __name__ == "__main__":
lock = threading.RLock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# 結果三次采集
# num result : 0
# num result : 0
# num result : 0

3、Condition() 條件鎖

基本介紹

條件鎖是在遞歸鎖的基礎上增加了能夠暫停線程運行的功能。並且我們可以使用wait()與notify()來控制線程執行的個數

注意:條件鎖可以自由設定一次放行幾個線程。

下面是threading模塊與條件鎖提供的相關方法:

使用方式
下面這個案例會啟動10個子線程,並且會立即將10個子線程設置為等待狀態。

然後我們可以發送一個或者多個通知,來恢復被等待的子線程繼續運行:

import threading
currentRunThreadNumber = 0 # 當前運行線程的數量
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name
condLock.acquire() # 上鎖
print("start and wait run thread : %s" % thName)
condLock.wait() # 暫停線程運行、等待喚醒
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
condLock.release() # 解鎖
if __name__ == "__main__":
condLock = threading.Condition() # 創建條件鎖
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:")) # 手動輸入放行數量
# 循環放行
condLock.acquire()
condLock.notify(notifyNumber) # 放行
condLock.release()
print("main thread run end")
# 先啟動10個子線程,然後這些子線程會全部變為等待狀態
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# start and wait run thread : Thread-4
# start and wait run thread : Thread-5
# start and wait run thread : Thread-6
# start and wait run thread : Thread-7
# start and wait run thread : Thread-8
# start and wait run thread : Thread-9
# start and wait run thread : Thread-10
# 批量發送通知,放行特定數量的子線程繼續運行
# Please enter the number of threads that need to be notified to run:5 # 放行5個
# carry on run thread : Thread-4
# carry on run thread : Thread-3
# carry on run thread : Thread-1
# carry on run thread : Thread-2
# carry on run thread : Thread-5
# Please enter the number of threads that need to be notified to run:5 # 放行5個
# carry on run thread : Thread-8
# carry on run thread : Thread-10
# carry on run thread : Thread-6
# carry on run thread : Thread-9
# carry on run thread : Thread-7
# Please enter the number of threads that need to be notified to run:1
# main thread run end

with語句
由於threading.Condition()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:

import threading
currentRunThreadNumber = 0
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name # 返回當前線程名稱
with condLock: # with 用法
print("start and wait run thread : %s" % thName)
condLock.wait() # 暫停線程運行、等待喚醒
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
if __name__ == "__main__":
condLock = threading.Condition()
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:"))
with condLock: # with 用法
condLock.notify(notifyNumber) # 放行
print("main thread run end")

4、Event() 事件鎖

基本介紹

事件鎖是基於條件鎖來做的,它與條件鎖的區別在於一次只能放行全部,不能放行任意個數量的子線程繼續運行。

我們可以將事件鎖看為紅綠燈,當紅燈時所有子線程都暫停運行,並進入“等待”狀態,當綠燈時所有子線程都恢復“運行”。

下面是threading模塊與事件鎖提供的相關方法:

使用方式
事件鎖不能利用with語句來進行使用,只能按照常規方式。

如下所示,我們來模擬線程和紅綠燈的操作,紅燈停,綠燈行:

import threading
maxSubThreadNumber = 3
def task():
thName = threading.currentThread().name
print("start and wait run thread : %s" % thName)
eventLock.wait() # 暫停運行,等待綠燈 
print("green light, %s carry on run" % thName)
print("red light, %s stop run" % thName)
eventLock.wait() # 暫停運行,等待綠燈 
print("green light, %s carry on run" % thName)
print("sub thread %s run end" % thName)
if __name__ == "__main__":
eventLock = threading.Event() # 事件鎖
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task) # 創建線程 
subThreadIns.start()
# 交替綠、紅燈
eventLock.set() # 設置為綠燈 
eventLock.clear() # 設置為紅燈 
eventLock.set() # 設置為綠燈
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# green light, Thread-1 carry on run
# red light, Thread-1 stop run
# green light, Thread-1 carry on run
# sub thread Thread-1 run end
# green light, Thread-3 carry on run
# red light, Thread-3 stop run
# green light, Thread-3 carry on run
# sub thread Thread-3 run end
# green light, Thread-2 carry on run
# red light, Thread-2 stop run
# green light, Thread-2 carry on run
# sub thread Thread-2 run end

5、Semaphore() 信號量鎖

基本介紹

信號量鎖也是根據條件鎖來做的,它與條件鎖和事件鎖的區別如下:

  • 條件鎖:一次可以放行任意個處於“等待”狀態的線程
  • 事件鎖:一次可以放行全部的處於“等待”狀態的線程
  • 信號量鎖:通過規定,成批的放行特定個處於“上鎖”狀態的線程

下面是threading模塊與信號量鎖提供的相關方法:

使用方式
以下是使用示例,你可以將它當做一段限寬的路段,每次只能放行相同數量的線程:

import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
semaLock.acquire()
print("run sub thread %s" % thName)
time.sleep(3)
semaLock.release()
if __name__ == "__main__":
# 每次只能放行2個
semaLock = threading.Semaphore(2) # 信號量鎖 可以限制每次放行線程的數量
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
# run sub thread Thread-1
# run sub thread Thread-2
# run sub thread Thread-3
# run sub thread Thread-4
# run sub thread Thread-6
# run sub thread Thread-5

with語句

由於threading.Semaphore()對象中實現了enter__()與__exit()方法,故我們可以使用with語句進行上下文管理形式的加鎖解鎖操作:

import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
with semaLock: # with 信號量鎖 semaphore
print("run sub thread %s" % thName)
time.sleep(3)
if __name__ == "__main__":
semaLock = threading.Semaphore(2)
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()

鎖關系淺析

上面5種鎖可以說都是基於同步鎖來做的,這些你都可以從源碼中找到答案。

首先來看RLock遞歸鎖,遞歸鎖的實現非常簡單,它的內部會維護著一個計數器,當計數器不為0的時候該線程不能被I/O操作和時間輪詢機制切換。但是當計數器為0的時候便不會如此了:

def __init__(self):
self._block = _allocate_lock()
self._owner = None
self._count = 0 # 計數器

Condition條件鎖的內部其實是有兩把鎖的,一把底層鎖(同步鎖)一把高級鎖(遞歸鎖)。

低層鎖的解鎖方式有兩種,使用wait()方法會暫時解開底層鎖同時加上一把高級鎖,只有當接收到別的線程裡的**notfiy()**後才會解開高級鎖和重新上鎖低層鎖,也就是說條件鎖底層是根據同步鎖和遞歸鎖的不斷切換來進行實現的:

def __init__(self, lock=None):
if lock is None:
lock = RLock() # 可以看到條件鎖的內部是基於遞歸鎖,而遞歸鎖又是基於同步鎖來做的 
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()

Event事件鎖內部是基於條件鎖來做的:

class Event:
def __init__(self):
self._cond = Condition(Lock()) # 實例化出了一個條件鎖。 
self._flag = False
def _reset_internal_locks(self): # private! 
called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set

Semaphore信號量鎖內部也是基於條件鎖來做的:

class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock()) # 可以看到,這裡是實例化出了一個條件鎖 
self._value = value

基本練習題

條件鎖的應用

需求:一個空列表,兩個線程輪番往裡面加值(一個加偶數,一個加奇數),最終讓該列表中的值為 1 - 100 ,且是有序排列的。

import threading
lst = []
def even():
"""加偶數"""
with condLock: # with 自動加鎖、解鎖
for i in range(2, 101, 2):
# 判斷當前列表的長度處於2是否能處盡
# 如果能處盡則代表需要添加奇數
# 否則就添加偶數
if len(lst) % 2 != 0:
# 添偶數
lst.append(i) # 先添加值
condLock.notify() # 告訴另一個線程,你可以加奇數了,但是這裡不會立即交出執行權
condLock.wait() # 交出執行權,並等待另一個線程通知加偶數
else:
# 添奇數
condLock.wait() # 交出執行權,等待另一個線程通知加偶數
lst.append(i)
condLock.notify()
condLock.notify()
def odd():
"""加奇數"""
with condLock: # 自動加鎖、解鎖
for i in range(1, 101, 2):
if len(lst) % 2 == 0:
lst.append(i)
condLock.notify() # 
condLock.wait()
condLock.notify()
if __name__ == "__main__":
condLock = threading.Condition() # 條件鎖
addEvenTask = threading.Thread(target=even)
addOddTask = threading.Thread(target=odd)
addEvenTask.start()
addOddTask.start()
addEvenTask.join() # join方法的作用是線程同步
addOddTask.join()
print(lst)

事件鎖的應用

有2個任務線程來扮演李白和杜甫,如何讓他們一人一句進行對答?文本如下:

杜甫:老李啊,來喝酒!
李白:老杜啊,不喝了我喝不下了!
杜甫:老李啊,再來一壺?
杜甫:…老李?
李白:呼呼呼…睡著了

代碼如下:

import threading
def libai():
event.wait()
print("李白:老杜啊,不喝了我喝不下了!")
event.set()
event.clear()
event.wait()
print("李白:呼呼呼...睡著了..")
def dufu():
print("杜甫:老李啊,來喝酒!")
event.set()
event.clear()
event.wait()
print("杜甫:老李啊,再來一壺?")
print("杜甫:...老李?")
event.set()
if __name__ == '__main__':
event = threading.Event() # 事件鎖
t1 = threading.Thread(target=libai)
t2 = threading.Thread(target=dufu)
t1.start()
t2.start()
t1.join()
t2.join()

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved