程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

5 Python thread locks

編輯:Python

Here's the catalog title

  • Thread safety
  • The function of lock
  • 1、Lock() Synchronization lock
    • Deadlock
  • 2、RLock() Recursive lock
  • 3、Condition() Conditional lock
  • 4、Event() Event lock
  • 5、Semaphore() Semaphore lock
  • Analysis of lock relationship
  • Basic exercises
    • The application of conditional lock
    • Application of event lock

Thread safety

Thread safety is a concept in multi thread or multi process programming , In a program that runs in parallel with multiple threads that share data , Thread safe code can ensure that all threads can execute normally and correctly through synchronization mechanism , There will be no data pollution and other accidents .

Thread safety is mainly caused by thread switching , Like a room ( process ) There is 10 Granulated sugar ( resources ), In addition to that 3 I'm a little man (1 Main threads 、2 Child threads ), Be a villain A Ate 3 When he was forced to rest by the system after a sugar, he thought that there was still 7 Granulated sugar , And being a villain B Work and eat 3 Granulated sugar , So being a villain A When you go back to work, you think there's still sugar left 7 star , But actually only 4 It's gone .

In the above example, the thread A And thread B The data of is out of sync , That's thread safety , It can lead to very serious accidents , Let's use the following example to illustrate .

Here's a number num The initial value is 0, We turn on 2 Thread thread :

Threads 1 Yes num Ten million times +1 The operation of

Threads 2 Yes num Ten million times -1 The operation of

The results can be staggering ,num In the end, it's not what we expected 0:

import threading
num = 0
def add():
global num
for i in range(10_000_000):
num += 1
def sub():
global num
for i in range(10_000_000):
num -= 1
''' add() and sub() Two threads are working on num To operate Look at the results , It can be inferred that : 1、 Simultaneous operation is not a program crash (num yes int type ,int The type itself is thread safe ) 2、 But the operation result will be in an unknown state , This indicates that this process is thread unsafe . Because the order in which threads run is unknown . '''
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection 
# num result : 669214
# num result : -1849179
# num result : -525674

This is a very good case , To solve this problem, we must pass lock To ensure the timing of thread switching .

What we need to pay attention to is , stay Python In the basic data type list、tuple、dict Itself is Thread safe , So if there are multiple threads on this 3 When you operate a container , We don't have to think about thread safety .( Very good design )

The function of lock

The lock is Python It provides us with a means to control thread switching by ourselves , The use of locks can make the switching of threads orderly .

Once the threads switch in order , Access to data between threads 、 The modification becomes controllable , So to ensure thread safety , You have to use a lock .

threading modular Provided in 5 The most common lock , Here's a breakdown by function :

  • Synchronization lock :lock( Only one can be released at a time )

  • Recursive lock :rlock( Only one can be released at a time )

  • Conditional lock :condition( You can release any one at a time )

  • Event lock :event( All at once )

  • Semaphore lock :semaphore( You can release a specific one at a time )

1、Lock() Synchronization lock

Basic introduction
Lock There are many names for locks , Such as : Synchronization lock 、 The mutex

What do they mean ? As shown below :

Mutually exclusive It refers to a resource There can only be one visitor at a time Visit it , It is unique and exclusive , however Mutual exclusion cannot limit Visitors' response to resources Order of access , That is, access is A disorderly

Synchronization is based on mutual exclusion ( In most cases ), Through other mechanisms, visitors can access resources orderly

Synchronization is already mutually exclusive , Is a more complex implementation of mutex , Because it realizes the characteristics of orderly access on the basis of mutual exclusion

Here is threading Module and synchronization lock provide related methods :

Usage mode

Synchronization lock can only release one thread at a time , A locked thread will not hand over the execution right at runtime , Only when the thread is unlocked will the execution right be transferred to other threads through system scheduling .

As shown below , Use synchronization lock to solve the top problem :

import threading
num = 0
def add():
lock.acquire() # Synchronization lock 
global num
for i in range(10_000_000):
num += 1
lock.release()
def sub():
lock.acquire() # Synchronization lock Only one at a time 
global num
for i in range(10_000_000):
num -= 1
lock.release()
if __name__ == "__main__":
lock = threading.Lock() # Create a synchronization lock 
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection 
# num result : 0
# num result : 0
# num result : 0

So this code is It becomes completely serial , For this kind of computing intensive I/O In terms of business , It's not as fast as serializing single thread execution directly , So this is just an example , Can't outline the real purpose of the lock .

Deadlock

For synchronous locks , once acquire() Must correspond to once release(), It can't be used repeatedly acquire() And then reuse it many times release() The operation of , This will cause deadlock and block the program , It's not moving at all , As shown below :

import threading
num = 0
def add():
lock.acquire() # locked 
lock.acquire() # Deadlock 
# Don't execute 
global num
for i in range(10_000_000):
num += 1
lock.release()
lock.release()
def sub():
lock.acquire() # locked 
lock.acquire() # Deadlock 
# Don't execute 
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.Lock() # establish lock
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)

with sentence ( Automatic locking 、 Unlock )

because threading.Lock() Object enter__() And __exit() Method , So we can use with The sentence goes on Context management forms Lock and unlock operation :

import threading
num = 0
def add():
with lock: # with sentence lock Usage of , Automatic locking 、 Unlock 
# Automatic locking 
global num
for i in range(10_000_000):
num += 1
# Auto unlock 
def sub():
with lock:
# Automatic locking 
global num
for i in range(10_000_000):
num -= 1
# Auto unlock 
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection 
# num result : 0
# num result : 0
# num result : 0

2、RLock() Recursive lock

Basic introduction

Recursive lock is an upgraded version of synchronous lock , On the basis of synchronous lock It can be used repeatedly for many times acquire() after Reuse it many times release() The operation of , But be sure to pay attention to Locking times and unlocking times must be consistent , Otherwise, it will also cause deadlock .

Here is threading Module and recursive lock provide related methods :

Usage mode

Here's a simple use of recursive locks , In the following operation, if synchronization lock is used, deadlock will occur , But recursive locks don't :

import threading
num = 0
def add():
lock.acquire() # Lock for the first time 
lock.acquire() # Second lock , This kind of writing Recursive lock Normal operation ; The synchronization lock will deadlock ;
global num
for i in range(10_000_000):
num += 1
lock.release() # incomprehension The meaning of recursive lock ???
lock.release()
def sub():
lock.acquire()
lock.acquire()
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.RLock() # establish Recursive lock object 
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection 
# num result : 0
# num result : 0
# num result : 0

with sentence
because threading.RLock() Object enter__() And __exit() Method , So we can use with Statement to lock and unlock in the form of context management :

import threading
num = 0
def add():
with lock: # Recursive lock and synchronous lock You can use with sentence , Perform automatic locking 、 Unlock 
# Automatic locking 
global num
for i in range(10_000_000):
num += 1
# Auto unlock 
def sub():
with lock:
# Automatic locking 
global num
for i in range(10_000_000):
num -= 1
# Auto unlock 
if __name__ == "__main__":
lock = threading.RLock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection 
# num result : 0
# num result : 0
# num result : 0

3、Condition() Conditional lock

Basic introduction

Conditional lock It is based on recursive lock and can Pause The function of thread running . And we can use wait() And notify() Come on Control the number of threads executed .

Be careful : Conditional locks are free Set up A release How many? Threads .

Here is threading Module and conditional lock provide related methods :

Usage mode
The following case will start 10 Child threads , And will immediately 10 The child thread is set to wait .

Then we can send one or more notifications , To restore the waiting child thread to continue running :

import threading
currentRunThreadNumber = 0 # Number of threads currently running 
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name
condLock.acquire() # locked 
print("start and wait run thread : %s" % thName)
condLock.wait() # Pause thread running 、 Waiting to wake up 
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
condLock.release() # Unlock 
if __name__ == "__main__":
condLock = threading.Condition() # Create a conditional lock 
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:")) # Enter the release quantity manually 
# Circulation release 
condLock.acquire()
condLock.notify(notifyNumber) # release 
condLock.release()
print("main thread run end")
# Start... First 10 Child threads , Then all of these sub threads will be in the waiting state 
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# start and wait run thread : Thread-4
# start and wait run thread : Thread-5
# start and wait run thread : Thread-6
# start and wait run thread : Thread-7
# start and wait run thread : Thread-8
# start and wait run thread : Thread-9
# start and wait run thread : Thread-10
# Bulk notification , Release a specific number of child threads to continue running 
# Please enter the number of threads that need to be notified to run:5 # release 5 individual 
# carry on run thread : Thread-4
# carry on run thread : Thread-3
# carry on run thread : Thread-1
# carry on run thread : Thread-2
# carry on run thread : Thread-5
# Please enter the number of threads that need to be notified to run:5 # release 5 individual 
# carry on run thread : Thread-8
# carry on run thread : Thread-10
# carry on run thread : Thread-6
# carry on run thread : Thread-9
# carry on run thread : Thread-7
# Please enter the number of threads that need to be notified to run:1
# main thread run end

with sentence
because threading.Condition() Object enter__() And __exit() Method , So we can use with Statement to lock and unlock in the form of context management :

import threading
currentRunThreadNumber = 0
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name # Returns the current thread name 
with condLock: # with usage 
print("start and wait run thread : %s" % thName)
condLock.wait() # Pause thread running 、 Waiting to wake up 
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
if __name__ == "__main__":
condLock = threading.Condition()
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:"))
with condLock: # with usage 
condLock.notify(notifyNumber) # release 
print("main thread run end")

4、Event() Event lock

Basic introduction

Event lock It is based on conditional locking , It differs from conditional locking in that One time can only Let go of all , You cannot release any number of child threads to continue running .

We can think of event locks as traffic lights , When the light is red, all child threads are suspended , And enter “ wait for ” state , When the light is green, all child threads are restored “ function ”.

Here is threading Module and event lock provide related methods :

Usage mode
Event locks cannot be used with Statement to use , Only in the normal way .

As shown below , Let's simulate the operation of threads and traffic lights , Stop at the red light , pass at a green light :

import threading
maxSubThreadNumber = 3
def task():
thName = threading.currentThread().name
print("start and wait run thread : %s" % thName)
eventLock.wait() # Suspend operation , Waiting for the green light 
print("green light, %s carry on run" % thName)
print("red light, %s stop run" % thName)
eventLock.wait() # Suspend operation , Waiting for the green light 
print("green light, %s carry on run" % thName)
print("sub thread %s run end" % thName)
if __name__ == "__main__":
eventLock = threading.Event() # Event lock 
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task) # Create thread 
subThreadIns.start()
# Alternating green 、 A red light 
eventLock.set() # Set it to green 
eventLock.clear() # Set to red light 
eventLock.set() # Set it to green 
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# green light, Thread-1 carry on run
# red light, Thread-1 stop run
# green light, Thread-1 carry on run
# sub thread Thread-1 run end
# green light, Thread-3 carry on run
# red light, Thread-3 stop run
# green light, Thread-3 carry on run
# sub thread Thread-3 run end
# green light, Thread-2 carry on run
# red light, Thread-2 stop run
# green light, Thread-2 carry on run
# sub thread Thread-2 run end

5、Semaphore() Semaphore lock

Basic introduction

Semaphore lock It is also done according to the condition lock , It differs from conditional lock and event lock as follows :

  • Conditional lock : You can release any one at a time “ wait for ” Thread in state
  • Event lock : All of them can be released at one time “ wait for ” Thread in state
  • Semaphore lock : Pass regulations , The release of a particular product in batches “ locked ” Thread in state

Here is threading Module and semaphore lock provide related methods :

Usage mode
Here is an example of how to use , You can think of it as a limited section of road , Only the same number of threads can be released at a time :

import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
semaLock.acquire()
print("run sub thread %s" % thName)
time.sleep(3)
semaLock.release()
if __name__ == "__main__":
# We can only release each time 2 individual 
semaLock = threading.Semaphore(2) # Semaphore lock You can limit the number of threads released each time 
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
# run sub thread Thread-1
# run sub thread Thread-2
# run sub thread Thread-3
# run sub thread Thread-4
# run sub thread Thread-6
# run sub thread Thread-5

with sentence

because threading.Semaphore() Object enter__() And __exit() Method , So we can use with Statement to lock and unlock in the form of context management :

import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
with semaLock: # with Semaphore lock semaphore
print("run sub thread %s" % thName)
time.sleep(3)
if __name__ == "__main__":
semaLock = threading.Semaphore(2)
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()

Analysis of lock relationship

above 5 All kinds of locks are Based on synchronous lock To do the , You can find the answers to all of these from the source code .

First of all to see RLock Recursive lock , The implementation of recursive lock is very simple , Its interior will maintain A counter , When the counter is not 0 The thread cannot be I/O Switching between operation and time polling mechanism . But when the counter is 0 It won't be like this when you're young :

def __init__(self):
self._block = _allocate_lock()
self._owner = None
self._count = 0 # Counter 

and Condition Conditional lock In fact, there are Two locks Of , A bottom lock ( Synchronization lock ) An advanced lock ( Recursive lock ).

There are two ways to unlock the low level lock , Use wait() Method will temporarily unlock the underlying lock and add an advanced lock , Only when receiving from another thread **notfiy()** After that, the senior lock will be unlocked and the lower lock will be locked again , In other words, the bottom layer of conditional lock is realized according to the continuous switching between synchronous lock and recursive lock :

def __init__(self, lock=None):
if lock is None:
lock = RLock() # You can see that the interior of conditional lock is based on recursive lock , Recursive lock is based on synchronous lock 
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()

Event Event lock The internal is based on conditional locking :

class Event:
def __init__(self):
self._cond = Condition(Lock()) # A conditional lock is instantiated . 
self._flag = False
def _reset_internal_locks(self): # private! 
called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set

Semaphore Semaphore lock is also based on conditional lock :

class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock()) # You can see , Here is an example of a conditional lock 
self._value = value

Basic exercises

The application of conditional lock

demand : An empty list , Two threads add value in turn ( A plus even number , One plus odd number ), Finally, let the value in the list be 1 - 100 , And it's in order .

import threading
lst = []
def even():
""" Add even numbers """
with condLock: # with Automatic locking 、 Unlock 
for i in range(2, 101, 2):
# Judge whether the length of the current list is in 2 If you can get along with 
# If it can be used up, it means that odd numbers need to be added 
# Otherwise, even numbers are added 
if len(lst) % 2 != 0:
# Add even numbers 
lst.append(i) # Add values first 
condLock.notify() # Tell another thread , You can add odd numbers , But there will be no immediate surrender of executive power 
condLock.wait() # Hand over the power of execution , And wait for another thread to add an even number 
else:
# Add odd numbers 
condLock.wait() # Hand over the power of execution , Wait for another thread to give an even number 
lst.append(i)
condLock.notify()
condLock.notify()
def odd():
""" Add odd numbers """
with condLock: # Automatic locking 、 Unlock 
for i in range(1, 101, 2):
if len(lst) % 2 == 0:
lst.append(i)
condLock.notify() # 
condLock.wait()
condLock.notify()
if __name__ == "__main__":
condLock = threading.Condition() # Conditional lock 
addEvenTask = threading.Thread(target=even)
addOddTask = threading.Thread(target=odd)
addEvenTask.start()
addOddTask.start()
addEvenTask.join() # join Method is used to synchronize threads 
addOddTask.join()
print(lst)

Application of event lock

Yes 2 A task thread to play Li Bai and Du Fu , How to make them answer one by one ? The text is as follows :

 Du Fu : Lao Li , Come and have a drink !
Li Bai : Old du , No, I can't !
Du Fu : Lao Li , Another pot ?
Du Fu :… Lao Li ?
Li Bai : Whoosh, whoosh, whoosh … fell asleep

The code is as follows :

import threading
def libai():
event.wait()
print(" Li Bai : Old du , No, I can't !")
event.set()
event.clear()
event.wait()
print(" Li Bai : Whoosh, whoosh, whoosh ... fell asleep ..")
def dufu():
print(" Du Fu : Lao Li , Come and have a drink !")
event.set()
event.clear()
event.wait()
print(" Du Fu : Lao Li , Another pot ?")
print(" Du Fu :... Lao Li ?")
event.set()
if __name__ == '__main__':
event = threading.Event() # Event lock 
t1 = threading.Thread(target=libai)
t2 = threading.Thread(target=dufu)
t1.start()
t2.start()
t1.join()
t2.join()

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved