程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python thread synchronization (2) -- condition object

編輯:Python

1. introduction

In the last article , We introduced thread synchronization and Python Lock mechanism in . Python Thread synchronization ( One ) — Race conditions and thread locks

But the lock mechanism can only solve the simplest and most common thread synchronization scenarios , In this article, we will introduce in detail which new thread synchronization tools need to be used in more complex scenarios — Conditions of the object .

2. brief introduction

We have previously analyzed Java The source code of the condition object in . Lock waiting and wakeup — ConditionObject The source code parsing

I understand java The execution principle of condition object in , We'll find out python The condition object in is the same as java The condition object in is actually exactly one thing . There's a scene like this , The status of the order is constantly changing , Threads 1 Care about the order payment success status and do something after that , Threads 2 Care about the order origination refund status and do something later , The business thread changes the order status constantly in the process of business execution , When an order is created , We need to make threads 1、 Threads 2 Block waiting , Only when the expected status is successfully updated , Just wake up , And the state itself is a competitive condition , Its changes and queries need to be locked . It seems that the above scene is very complicated , But it's very convenient to use conditional objects .

3. Execution process of condition object

The condition object always holds a reference to a lock , When creating a condition object, it can be passed in as a parameter , Must be threading.Lock perhaps threading.RLock, If no incoming , The default... Will be created threading.RLock. Condition objects also have locking and unlocking methods , The condition object is only responsible for calling the corresponding method of the object lock member . After the lock , Once the call wait Method , Then the lock is automatically released and the waiting is blocked , here , Another thread waiting for a lock starts executing , Until the thread calls notify or notify_all Method and release the lock , The waiting thread can continue to execute .

4. Lock and unlock

acquire(*args) release()

As mentioned above , Locking and unlocking are actually directly calling the corresponding method of the lock instance held by the condition object .

5. wait for

5.1. wait

wait(timeout=None)

Block waiting until wakeup or timeout . The method must be called after the thread acquires the lock , Otherwise it will throw RuntimeError. This method releases the lock , And then block , Until the same conditional variable is invoked in another thread. notify() or notify_all() Wake up it , Or until the optional timeout occurs . If the condition object holds RLock, Then he won't call release Method release lock , But the call RLock Internal interface of , One time release . from python3.2 Start , This method always returns None.

5.2. wait_for

wait_for(predicate, timeout=None)

wait for , Until the condition evaluates to true or times out . predicate It should be a callable object and its return value can be interpreted as a Boolean value . And wait The method is the same ,wait_for Method also supports passing in a timeout Parameters . Its implementation method is roughly equivalent to :

while not predicate():
cv.wait()

6. Wake up the

6.1. notify

notify(n=1)

Wake up the thread waiting for this condition object , The incoming parameter is the number of wake-up threads , The default is 1. If the calling thread calls this method without obtaining a lock , May trigger RuntimeError abnormal . It should be noted that , The awakened thread doesn't actually return what it called wait() , Until it can regain the lock , and notify Method does not release the lock .

6.2. notify_all

notify_all()

Wake up all threads waiting for this condition object . amount to :

cv.notify(threading.active_count())

7. Context management protocols and examples

Condition objects also support python Context management protocol , Next, we implement the listener for the order status we imagined at the beginning through the condition object and the context management protocol :

import logging
import random
from threading import Thread, Condition
class Queue():
def __init__(self):
self.runtimes = 0
self.front = -1
self.rear = -1
self.queue = []
def enqueue(self, ele): # Joining operation
self.queue.append(ele)
self.rear = self.rear + 1
def dequeue(self): # Out of line operation
if self.isempty():
return None
node = self.queue.pop(0)
self.front = self.front + 1
return node
def isempty(self):
return self.front == self.rear
class CareStatusThread(Thread):
def __init__(self, carestatus, conobj, queue, threshold):
super().__init__()
self.orderids = []
self.conobj = conobj
self.queue = queue
self.notifystatus = carestatus
self.threshold = threshold
def run(self):
while True:
with self.conobj:
logging.info('%r start running' % self)
if self.queue.isempty():
logging.info('%r queue is empty' % self)
self.conobj.wait()
firstorder = None
orderids = list()
while True:
order = self.queue.dequeue()
if order is None or order == firstorder:
break
if order['status'] != self.notifystatus:
if firstorder is None:
firstorder = order
self.queue.enqueue(order)
continue
orderids.append(order['id'])
if len(orderids) > 0:
self.orderids.extend(orderids)
logging.info('%r orders%s add in list' % (self, orderids))
logging.info('%r run over' % self)
if self.queue.runtimes == self.threshold:
return
self.conobj.wait()
def __repr__(self):
return 'CareStatusThread(%s)' % self.notifystatus
class ProducerThread(Thread):
def __init__(self, orderlist, conobj, queue, threshold):
super().__init__()
self.orderlist = orderlist
self.conobj = conobj
self.queue = queue
self.threshold = threshold
def run(self):
for _ in range(self.threshold):
with self.conobj:
times = int(random.uniform(1, 5))
for _ in range(times):
index = int(random.uniform(0, len(self.orderlist)))
order = self.orderlist[index]
fromstatus = order['status']
order['status'] += 1
self.queue.enqueue(order)
logging.info('%r change order %s from %s to %s' % (self, order['id'], fromstatus, order['status']))
self.queue.runtimes += 1
logging.info('%r run over' % self)
self.conobj.notify_all()
def __repr__(self):
return 'ProducerThread'
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
conobj = Condition()
queue = Queue()
orderid = 10001
threshold = 10
orderlist = [{'id': orderid + i, 'status': 0} for i in range(50)]
producer = ProducerThread(orderlist, conobj, queue, threshold)
afterpay = CareStatusThread(1, conobj, queue, threshold)
afterrefund = CareStatusThread(2, conobj, queue, threshold)
afterpay.start()
afterrefund.start()
producer.start()
producer.join()
afterpay.join()
afterrefund.join()
logging.info('%r orderids: %s' % (afterpay, afterpay.orderids))
logging.info('%r orderids: %s' % (afterrefund, afterrefund.orderids))

7.1. Execution results

Printed out :

2019-05-11 10:01:43,340 - INFO: CareStatusThread(1) start running 2019-05-11 10:01:43,342 - INFO: CareStatusThread(1) queue is empty 2019-05-11 10:01:43,343 - INFO: CareStatusThread(2) start running 2019-05-11 10:01:43,344 - INFO: CareStatusThread(2) queue is empty 2019-05-11 10:01:43,346 - INFO: ProducerThread change order 10020 from 0 to 1 2019-05-11 10:01:43,347 - INFO: ProducerThread change order 10041 from 0 to 1 2019-05-11 10:01:43,348 - INFO: ProducerThread run over 2019-05-11 10:01:43,348 - INFO: ProducerThread change order 10029 from 0 to 1 2019-05-11 10:01:43,349 - INFO: ProducerThread run over 2019-05-11 10:01:43,350 - INFO: CareStatusThread(1) orders[10020, 10041, 10029] add in list 2019-05-11 10:01:43,350 - INFO: CareStatusThread(1) run over 2019-05-11 10:01:43,351 - INFO: ProducerThread change order 10010 from 0 to 1 2019-05-11 10:01:43,351 - INFO: ProducerThread change order 10020 from 1 to 2 2019-05-11 10:01:43,352 - INFO: ProducerThread run over 2019-05-11 10:01:43,352 - INFO: ProducerThread change order 10003 from 0 to 1 2019-05-11 10:01:43,353 - INFO: ProducerThread run over 2019-05-11 10:01:43,353 - INFO: ProducerThread change order 10005 from 0 to 1 2019-05-11 10:01:43,354 - INFO: ProducerThread change order 10010 from 1 to 2 2019-05-11 10:01:43,354 - INFO: ProducerThread change order 10032 from 0 to 1 2019-05-11 10:01:43,354 - INFO: ProducerThread run over 2019-05-11 10:01:43,355 - INFO: ProducerThread change order 10025 from 0 to 1 2019-05-11 10:01:43,355 - INFO: ProducerThread change order 10034 from 0 to 1 2019-05-11 10:01:43,356 - INFO: ProducerThread change order 10036 from 0 to 1 2019-05-11 10:01:43,356 - INFO: ProducerThread change order 10033 from 0 to 1 2019-05-11 10:01:43,357 - INFO: ProducerThread run over 2019-05-11 10:01:43,357 - INFO: ProducerThread change order 10011 from 0 to 1 2019-05-11 10:01:43,357 - INFO: ProducerThread change order 10012 from 0 to 1 2019-05-11 10:01:43,358 - INFO: ProducerThread run over 2019-05-11 10:01:43,358 - INFO: ProducerThread change order 10045 from 0 to 1 2019-05-11 10:01:43,359 - INFO: ProducerThread change order 10036 from 1 to 2 2019-05-11 10:01:43,359 - INFO: ProducerThread run over 2019-05-11 10:01:43,360 - INFO: ProducerThread change order 10035 from 0 to 1 2019-05-11 10:01:43,360 - INFO: ProducerThread change order 10013 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread change order 10014 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread change order 10039 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread run over 2019-05-11 10:01:43,362 - INFO: ProducerThread change order 10039 from 1 to 2 2019-05-11 10:01:43,362 - INFO: ProducerThread run over 2019-05-11 10:01:43,363 - INFO: CareStatusThread(2) orders[10010, 10020, 10010, 10036, 10036, 10039, 10039] add in list 2019-05-11 10:01:43,364 - INFO: CareStatusThread(2) run over 2019-05-11 10:01:43,364 - INFO: CareStatusThread(1) start running 2019-05-11 10:01:43,365 - INFO: CareStatusThread(1) orders[10005, 10032, 10025, 10034, 10033, 10011, 10012, 10045, 10035, 10013, 10014] add in list 2019-05-11 10:01:43,365 - INFO: CareStatusThread(1) run over 2019-05-11 10:01:43,366 - INFO: CareStatusThread(1) orderids: [10020, 10041, 10029, 10005, 10032, 10025, 10034, 10033, 10011, 10012, 10045, 10035, 10013, 10014] 2019-05-11 10:01:43,366 - INFO: CareStatusThread(2) orderids: [10010, 10020, 10010, 10036, 10036, 10039, 10039]

7.2. analysis

In the above code , We created three threads :

  1. producer ProducerThread Responsible for randomly changing the status of several orders , And add the changed order to the order queue
  2. CareStatusThread(1) Listening to orders from 0 State to 1 state , And add it to your order list
  3. CareStatusThread(2) Listening to orders from 1 State to 2 state , And add it to your order list

We see ProducerThread After executing the change status , adopt notify_all Unlock after waking up the status listening thread , and CareStatusThread Then the corresponding business logic consumption queue is executed . This is a typical producer - Consumer model , Finally, we see that the two consumer threads have collected a series of orders they are concerned about id.

8. Reference material

https://docs.python.org/zh-cn/3.6/library/threading.html.


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved