程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python 線程同步(四) -- 事件對象與柵欄

編輯:Python

1. 引言

我們已經介紹了 python 的幾種線程同步工具。 Python 線程同步(一) — 競爭條件與線程鎖python 線程同步(二) — 條件對象python 線程同步(三) — 信號量

本文介紹的線程同步工具相比上面已經介紹過的三類工具來說,更加簡單實用。

2. 事件對象 — Event

事件的使用是線程間通信的最簡單機制之一 — 一個線程發出事件信號,另一個線程等待並響應該信號。 python threading 包中提供的事件對象 Event 就是用來做這件事的。 當事件對象中的標志比特由 True 變為 False,所有等待在該事件上的線程都將被喚醒。 因此,python 中的事件對象 Event 提供了以下方法供調用:

2.1. is_set

is_set()

返回事件標志是否為 True。

2.2. set

set()

將事件內部標志比特設置為 True,接著喚醒所有等待在該事件上的線程。

2.3. clear

clear()

清除標志,將事件標志重置為 False,此後若幹個線程又可以重新阻塞在該事件對象上。

2.4. wait

wait(timeout=None)

阻塞線程直到內部變量為true。如果調用時內部標志為true,將立即返回。否則將阻塞線程,直到調用 set() 方法將標志設置為true或者發生可選的超時。 如果是因為超時返回,則會返回 False,否則會返回 True。

2.5. 示例

下面的例子展示了所有5個線程均阻塞在一個事件對象上,直到3秒後,主線程調用 set 方法觸發事件信號,可以看到所有 5 個線程均立即開始執行。

import logging
from threading import Thread, Event
from time import sleep
class EventThread(Thread):
def __init__(self, event, id):
super().__init__()
self._event = event
self._id = id
def run(self):
logging.info('%r start running' % self)
self._event.wait()
logging.info('%r continue running after event' % self)
def __repr__(self):
return 'EventThread(%s)' % self._id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
event = Event()
for i in range(5):
thread = EventThread(event, i)
thread.start()
logging.info('main start sleeping')
sleep(3)
logging.info('main set event')
event.set()

打印出了:

2019-05-14 09:15:50,626 - INFO: EventThread(0) start running 2019-05-14 09:15:50,626 - INFO: EventThread(1) start running 2019-05-14 09:15:50,626 - INFO: EventThread(2) start running 2019-05-14 09:15:50,626 - INFO: EventThread(3) start running 2019-05-14 09:15:50,626 - INFO: EventThread(4) start running 2019-05-14 09:15:50,626 - INFO: main start sleeping 2019-05-14 09:15:53,639 - INFO: main set event 2019-05-14 09:15:53,645 - INFO: EventThread(1) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(0) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(2) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(4) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(3) continue running after event

3. 柵欄對象 — Barrier

柵欄類是另一個簡單的同步原語,此前我們已經介紹過 Linux 與 Java 中的柵欄。 java 線程同步工具類

柵欄對象用於讓多個線程互相等待。 他維護了一個內部的計數器,值由構造方法默認傳入,每當有一個線程調用 wait 方法,則該值原子地減 1,直到減到 0,則讓所有阻塞 wait 在該柵欄對象上的線程繼續執行。

3.1. 構造方法

Barrier(parties, action=None, timeout=None)

  • 構造方法必須傳入一個值,就是我們上文所說的計數初始值
  • 如果提供了可調用的 action 參數,它會在所有線程被釋放時在其中一個線程中自動調用 action 方法
  • timeout 是默認的超時時間,如果沒有在 wait() 方法中指定超時時間,則會使用該值作為超時時間

3.2. wait

wait(timeout=None)

柵欄對象中最重要的方法就是 wait 方法了。 線程阻塞等待,直到構造方法傳入的 parties 個線程均阻塞等待在 wait 方法或超時,如果該方法傳入的超時時間為 None,則使用構造方法傳入的默認超時。 一旦超時發生,柵欄將立即進入破損狀態,此時其他仍阻塞等待該柵欄的線程將收到 wait 方法拋出的 BrokenBarrierError 異常。 如果試圖在已破損的柵欄對象上調用 wait 方法,也會立即拋出 BrokenBarrierError 異常。 返回一個數字,值為 0 到 parties - 1,解釋器保證了所有等待在同一個柵欄上的線程中,每一個的返回值都不同,以便讓你可以依賴 wait 方法的返回值來做一些處理。 如果創建柵欄對象時在構造函數中提供了 action 參數,它將在其中一個線程釋放前被調用。如果此調用引發了異常,柵欄對象將進入破損狀態。

3.3. reset

reset()

重置柵欄為默認的初始態。 如果柵欄中仍有線程等待釋放,這些線程將會收到 BrokenBarrierError 異常。 除非非常必要,否則並不建議使用該方法,很多時候與其重用一個狀態未知的柵欄,不如新建一個。

3.4. abort

abort()

使柵欄進入破損態。 這將導致所有已經調用和未來調用的 wait() 方法中引發 BrokenBarrierError 異常。

3.5. 柵欄對象的屬性

  • parties — 沖出柵欄所需要的線程數量
  • n_waiting — 當前時刻正在柵欄中阻塞的線程數量
  • broken — 一個布爾值,值為 True 錶明柵欄為破損態

3.6. 示例

柵欄的使用雖然簡單,但卻十分實用,在實際環境中,我們通常需要並發調用很多業務方的接口,並收集他們的返回,然後在所有接口均返回後再進行下一步處理。 但並不是所有接口的調用都是必須的,因此對於該場景,一個必要的優化方式是一旦收集到必要接口的返回,立即中斷其他接口的調用,並開始這之後的操作。 上述需求如果使用柵欄來解決會顯得非常簡單而優雅,雖然 Python 中我們並不能在線程外終止線程,但我們可以通過柵欄的 abort 方法讓那些尚未執行結束的線程一旦執行結束即拋出異常,從而讓我們不需要去關注他們。 下面的例子模擬了上面描述的過程。

import logging
import random
from threading import Thread, Barrier
from time import sleep, time
class InterfaceThread(Thread):
def __init__(self, majorbarrier, minorbarrier, id, major):
super().__init__()
self._majorbarrier = majorbarrier
self._minorbarrier = minorbarrier
self._id = id
self._major = major
def run(self):
nsec = random.uniform(0, 4)
logging.info('%r start running sleep %s' % (self, nsec))
sleep(nsec)
logging.info('%r after sleeping' % self)
if self._major:
try:
result = self._majorbarrier.wait()
if result == 0:
self._minorbarrier.abort()
except:
logging.error('%s waitting on majorbarrier aborted' % self)
return
else:
try:
self._minorbarrier.wait()
except:
logging.warning('%s watting on minorbarrier aborted' % self)
return
logging.info('%r continue running after barrier' % self)
def __repr__(self):
return 'InterfaceThread(%s【major: %s】)' % (self._id, self._major)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
start = time()
majorbarrier = Barrier(4)
minorbarrier = Barrier(3)
threads = list()
for i in range(6):
threads.append(InterfaceThread(majorbarrier, minorbarrier, i, bool(i >= 3)))
for thread in threads:
thread.start()
result = majorbarrier.wait()
if result == 0:
minorbarrier.abort()
logging.info('run by %s' % (time() - start))

上面的例子中創建了兩個柵欄對象,分別用來同步必要接口調用與非必要接口調用,我們通過隨機 sleep 0 到 4 秒來模擬接口調用。 一旦必要柵欄的 wait 方法返回 0,則意味著必要接口已全部返回,此時可以通過調用非必要柵欄的 abort 方法來破壞非必要柵欄,同時程序繼續執行,從而實現整體運行時間的最大限度縮短。

打印出了:

2019-05-14 14:00:05,045 - INFO: InterfaceThread(0【major: False】) start running sleep 1.3645551759667334 2019-05-14 14:00:05,050 - INFO: InterfaceThread(1【major: False】) start running sleep 3.5451267021153607 2019-05-14 14:00:05,050 - INFO: InterfaceThread(2【major: False】) start running sleep 3.0433784558963644 2019-05-14 14:00:05,052 - INFO: InterfaceThread(3【major: True】) start running sleep 2.0092681547999875 2019-05-14 14:00:05,053 - INFO: InterfaceThread(4【major: True】) start running sleep 2.266415383907653 2019-05-14 14:00:05,053 - INFO: InterfaceThread(5【major: True】) start running sleep 0.6692143957122372 2019-05-14 14:00:05,728 - INFO: InterfaceThread(5【major: True】) after sleeping 2019-05-14 14:00:06,416 - INFO: InterfaceThread(0【major: False】) after sleeping 2019-05-14 14:00:07,077 - INFO: InterfaceThread(3【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: run by 2.284111976623535 2019-05-14 14:00:07,329 - INFO: InterfaceThread(5【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: InterfaceThread(3【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - WARNING: InterfaceThread(0【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,109 - INFO: InterfaceThread(2【major: False】) after sleeping 2019-05-14 14:00:08,110 - WARNING: InterfaceThread(2【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,613 - INFO: InterfaceThread(1【major: False】) after sleeping 2019-05-14 14:00:08,613 - WARNING: InterfaceThread(1【major: False】) watting on minorbarrier aborted

可以看到,並發調用六個線程,按照 sleep 時間,應該在 3.5451267 秒以上。 而實際上,由於重要線程均以完成,主線程只用 2.284111976623535 秒便已返回。 這樣,我們就實現了接口性能的大幅提昇,但線程 1、2 由於 sleep 時間過長,沒有能夠在主線程返回前返回。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved