程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python thread synchronization (3) -- semaphore

編輯:Python

1. introduction

In the last two articles , We introduced in detail Python Two thread synchronization methods in — Locks and condition objects . Python Thread synchronization ( One ) — Race conditions and thread locks python Thread synchronization ( Two ) — Conditions of the object

In this paper, we introduce one of the oldest and classic thread synchronization primitives in the history of computer science — Semaphore .

2. Semaphore

We have already introduced Linux And Java The semaphore in .

Semaphore is a classic thread synchronization primitive in operating system , In fact, it is a mutex with counting function , Used to protect a resource that only allows a specified number of operations . Semaphores are very similar to locking mechanisms , But he maintains an internal count , Each lock atomically subtracts the count value 1, If the return value is not negative, it means that locking is successful , Otherwise add back 1 And block waiting until awakened , When unlocking, add... To the semaphore count 1 operation . Generally speaking , The change to the count value is made by CAS The operation realizes . CAS Thought and java The realization of atomic operation

3. Python The semaphore in — threading.Semaphore

python In the standard library threading The semaphore object is implemented in the package .

3.1. Construction method

The constructor of this object has a parameter value Used to initialize the count value in the semaphore described above , The default is 1.

threading.Semaphore(value=1)

3.1.1. value The value of

  • When value Incoming greater than 1, This is the most common usage , Used to limit the maximum value Threads can share resources at the same time
  • When value Passed in as 1 when , Semaphores degenerate into a normal thread lock , Although this is the default behavior , But with threading Compared to the lock object provided in , Although the basic thread lock is implemented by semaphores in the same way , But its execution efficiency is lower , Therefore, it is not recommended to use
  • When value Pass in 0 when , All threads trying to lock will block on the semaphore object , but Python It is allowed to call the unlocking method directly to increase the count value without locking , But this is often the wrong use , This should be avoided
  • When value Incoming less than 0 when , Will throw out ValueError abnormal

3.2. Lock

acquire(blocking=True, timeout=None)

The execution logic of the locking method has been described in detail above . Python The semaphore locking method allows two parameters to be passed in , Indicates whether it is blocked , And the maximum waiting time ( Number of seconds ) Lock success returns True.

3.3. Unlock

release()

The unlocking method is to add the counter in the semaphore 1, If the original value of the counter is 0, Wake up all threads blocked on the semaphore . Different from ordinary lock objects ,Python Semaphores in allow calls to be made without locking release Method to increment the counter 1.

import logging
from threading import Thread, Semaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = Semaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()

Printed out :

2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(0) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(1) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(2) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(4) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(5) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(6) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(7) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(8) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(9) start running 2019-05-12 22:12:24,016 - INFO: all the threads are running 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(0) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(1) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) hold the semaphore

You can see , We created 10 Thread and start , But since the initial count of semaphores is 0, So all 10 Threads immediately block and wait on semaphores after startup . Our main thread directly calls without locking the semaphore release Method , There is no error in this report , It activates 10 One of the threads runs .

4. Bounded semaphore — BoundedSemaphore

In the example above , We see ,Python The semaphore in allows us to directly call the unlocking method without locking to increase the counter value in the semaphore 1, This seems to make the constructor pass in value Value loses its value . Python There is another semaphore in , It is only a little different from the semaphore we explained above , That is when release Method attempts to increment the counter to a value greater than that passed in by the constructor value When the value of , Will throw out ValueError abnormal . therefore , In common use Semaphore And BoundedSemaphore There's no difference .

We change the semaphore in the example of unlocking without locking to BoundedSemaphore Try again :

import logging
from threading import Thread, BoundedSemaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = BoundedSemaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()

Printed out :

2019-05-13 00:08:35,020 - INFO: SemaphoreTestThread(0) start running 2019-05-13 00:08:35,024 - INFO: SemaphoreTestThread(1) start running 2019-05-13 00:08:35,025 - INFO: SemaphoreTestThread(2) start running 2019-05-13 00:08:35,027 - INFO: SemaphoreTestThread(3) start running 2019-05-13 00:08:35,028 - INFO: SemaphoreTestThread(4) start running 2019-05-13 00:08:35,034 - INFO: SemaphoreTestThread(5) start running 2019-05-13 00:08:35,039 - INFO: SemaphoreTestThread(6) start running 2019-05-13 00:08:35,043 - INFO: SemaphoreTestThread(7) start running 2019-05-13 00:08:35,053 - INFO: SemaphoreTestThread(8) start running 2019-05-13 00:08:35,060 - INFO: all the threads are running 2019-05-13 00:08:35,060 - INFO: add 1 on the semaphore Traceback (most recent call last): File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1741, in <module> 2019-05-13 00:08:35,054 - INFO: SemaphoreTestThread(9) start running main() File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1735, in main globals = debugger.run(setup[‘file’], None, None, is_module) File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1135, in run pydev_imports.execfile(file, globals, locals) # execute the script File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, ’exec’), glob, loc) File "D:/Workspace/code/python/fluentpython/thread/semaphore.py", line 34, in <module> semaphore.release() File "C:\Users\zeyu\Anaconda3\lib\threading.py", line 483, in release raise ValueError("Semaphore released too many times") ValueError: Semaphore released too many times

5. Example — A simple DB Connection pool

from threading import BoundedSemaphore, Semaphore
class PooledDB:
def __init__(self, creator, minconnections, maxconnections, *args, **kwargs):
self._args, self._kwargs = args, kwargs
self._creator = creator
self._minconnections = minconnections
self._maxconnections = maxconnections
self._max_semaphore = Semaphore(maxconnections)
self._min_semaphore = BoundedSemaphore(minconnections)
self._idle_cache = []
idle = [self.get_connection() for _ in range(minconnections)]
while idle:
idle.pop().close()
def get_connection(self, timeout=None):
hold = self._max_semaphore.acquire(timeout=timeout)
if hold:
hold = self._min_semaphore.acquire(blocking=False)
if hold:
return self._idle_cache.pop(0)
else:
return PooledConnection(self._creator, self, args=self._args, kwargs=self._kwargs)
return None
def returnConnect(self, connection):
try:
self._min_semaphore.release()
self._idle_cache.append(connection)
except ValueError:
connection.close(True)
finally:
self._max_semaphore.release()
class PooledConnection:
def __init__(self, creator, pool, *args, **kwargs):
self._pool = pool
self._creator = creator
self._con = self._creator.connect(args, kwargs)
def close(self, force_close=False):
if force_close:
self._con.close()
else:
self._pool.returnConnect(self)

This is just a simple example DB Connection pool implementation , meanwhile , For connection classes PooledConnection We omitted begin、commit、rollback、cursor、ping And so on , Because these have nothing to do with the content of this section , Only the connection creation method and close Method , All parameters are omitted 、 Boundary judgment , Just to make the example more concise , Soon I'll write an article detailing the DB Connection pool source code analysis , Coming soon . In the example above , Our connection pool construction method has two parameters — Maximum connections and minimum connections . We created two BoundedSemaphore object , They are respectively used to limit the maximum in the concurrent environment 、 Minimum connections .

5.1. Create connection

In the initial state, we have added the minimum number of connections to the idle queue , We see , When creating a connection , We first try to lock the semaphore with the maximum number of connections , This ensures that the number of connection pool connections in the concurrent environment will not exceed maxconnections value . then , The minimum connection number semaphore is locked , If the lock is successful, the connection is obtained from the idle queue , Otherwise, create a new connection .

5.2. Close the connection

When the connection is closed , We first try to release the minimum number of connections semaphore , Here it shows BoundedSemaphore The value of , Once the number of releases exceeds that passed in by the construction parameter minconnections It means that the number of releases is greater than the number of locks , in other words , The released connection is not from the idle queue _idle_cache Out of , and BoundedSemaphore Throw... At this point ValueError Exceptions allow us to force the connection to close directly , Instead of letting him go back to the connection pool . Compared with the minimum number of connections semaphore , Maximum connections semaphore usage Semaphore That's all right. .


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved