程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 更多編程語言 >> Python >> Python標准模塊--threading

Python標准模塊--threading

編輯:Python

1 模塊簡介

threading模塊在Python1.5.2中首次引入,是低級thread模塊的一個增強版。threading模塊讓線程使用起來更加容易,允許程序同一時間運行多個操作。

不過請注意,Python中的線程最好是與IO操作一起工作,比如從網絡上下載資源或者從你的電腦中讀取文件和目錄。如果你需要處理一些CPU密集的任務,你最好是看看Python的multiprocessing模塊。原因就是Python有GIL鎖(解釋器全局鎖),使得所有的線程在主線程內運行。由於這個原因,當你使用線程執行CPU密集型任務時,你可能會發現它會運行的很慢。下面,我們主要集中在IO操作--線程做的好的場景。

2 模塊使用

2.1 線程入門

一個線程允許你像運行一個獨立的程序一樣,運行一段獨立的代碼,類似於subprocess,區別在於,線程運行的是函數或者類,而不是一個獨立的程序。我發現使用一個具體的實例會有助於我們更加理解概念。實例如下,

import threading

def doubler(number):
    print(threading.currentThread().getName() + "\n")
    print(number * 2)
    print""

if __name__ == "__main__":
    for i in range(5):
        my_thread = threading.Thread(target = doubler,args = (i,))
        my_thread.start()
         my_thread.join()

我們首先引入threading模塊,創建一個常規的函數doubler。這個函數將輸入值乘以2,它也打印出調用這個函數的線程的名字,最後打印一個空白行。你也許會注意到當我們實例化一個線程時,我們設置它的target為我們的doubler函數,然後我們將變量傳遞給這個函數。使用args參數看起來有些奇怪,是因為我們需要向doubler函數傳遞一個序列,但是它只介紹一個變量,我們我們需要在末尾放入一個逗號,從而構造出一個序列。

如果你想等待一個線程結束,你需要調用它的join()方法。

當你運行這段代碼時,你應該可以得到如下的結果,

Thread-1

0

Thread-2

2

Thread-3

4

Thread-4

6

Thread-5

8

一般情況下,你並不希望將輸出打印到標准輸出上,當你這樣做的時候,會導致亂七八糟的混亂。你應該使用Python的logging模塊。這篇文章具體會告訴你如何使用logging模塊,Python標准模塊--logging。logging模塊是線程安全的,性能也很優越。讓我們把上面的代碼修改一下,加上logging模塊,如下所示,

import threading
import logging

def get_logger():
    logger = logging.getLogger("threading_example")
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("threading.log")
    fmt = "%(asctime)s - %(threadName)s - %(levelname)s - %(message)s"
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    return logger

def doubler(number,logger):
    logger.debug('double function executing')
    result = number * 2
    logger.debug("double function ended with {}".format(result))

if __name__ == "__main__":
    logger = get_logger()
    thread_names = ["Mike","George","Wanda","Dingbat","Nina"]
    for i in range(5):
        my_thread = threading.Thread(target = doubler,name = thread_names[i],args = (i,logger))
        my_thread.start()

這段代碼最大的改動就是加入了get_logger函數。這段代碼創建一個級別為debug的logger。它將會把日志保存在當前的工作目錄(例如,這個腳本所運行的目錄),日志文件名為threading.log,然後我們設置每行日志的格式。這個日志格式包括時間戳、線程名字、日志等級和要打印的消息。

在doubler函數中,我們將print語句修改為logging語句。你將會注意到當我們創建線程時,我們將logger傳入到doubler函數中。我們這麼做的原因是當你在每個線程中實例化logging對象時,你將會得到多個日志記錄單例,你的日志中將會有很多重復的行。

最後,我們通過創建一個名稱列表用於給創建的線程進行命名,通過使用name這個參數,給每個線程設置一個指定的名稱。當你運行這段代碼時,你就會在日志文件中得到如下內容,

2016-11-11 14:34:35,350 - Mike - DEBUG - double function executing
2016-11-11 14:34:35,350 - Mike - DEBUG - double function ended with 0
2016-11-11 14:34:35,350 - George - DEBUG - double function executing
2016-11-11 14:34:35,350 - Wanda - DEBUG - double function executing
2016-11-11 14:34:35,351 - George - DEBUG - double function ended with 2
2016-11-11 14:34:35,351 - Wanda - DEBUG - double function ended with 4
2016-11-11 14:34:35,351 - Dingbat - DEBUG - double function executing
2016-11-11 14:34:35,351 - Dingbat - DEBUG - double function ended with 6
2016-11-11 14:34:35,351 - Nina - DEBUG - double function executing
2016-11-11 14:34:35,351 - Nina - DEBUG - double function ended with 8

輸出具有很好的可解釋性。對於這部分,我想挖掘出更多的主題也就是threading.Thread。讓我們來看最後一個例子,不再直接調用Thread,我們創建我們的子類。

import threading
import logging

class MyThread(threading.Thread):
    def __init__(self,number,logger):
        threading.Thread.__init__(self)
        self.number = number
        self.logger = logger

    def run(self):
        logger.debug("Calling doubler")
        doubler(self.number,self.logger)

def get_logger():
    logger = logging.getLogger("threading_example")
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("threading_class.log")
    fmt = "%(asctime)s - %(threadName)s - %(levelname)s - %(message)s"
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    return logger

def doubler(number,logger):
    logger.debug('double function executing')
    result = number * 2
    logger.debug("double function ended with {}".format(result))

if __name__ == "__main__":
    logger = get_logger()
    thread_names = ["Mike","George","Wanda","Dingbat","Nina"]
    for i in range(5):
        thread = MyThread(i,logger)
        thread.setName(thread_names[i])
        thread.start()

在這個例子中,我們通過threading.Thread創建子類。我們傳入想翻倍的數字number和logging對象。但是這次,我們通過另一種方式--調用線程對象的setName方法來設置線程名字。當你調用start,它將會通過調用run方法來運行你所定義的線程。在我們的類例,我們調用doubler函數來完成我們所需的處理。輸出和之前的輸出很相似,只是我們多加了一行調用日志。嘗試著運行這段代碼,看看你得到的是什麼?

2.2 線程鎖和同步

當你創建了多個線程時,你可能會發現你需要考慮如何避免沖突。我的意思就是你可能會遇到多個線程在同一時間需要訪問同一個資源。如果你不考慮這個問題,你將會遇到一些發生在最壞的時刻並且通常是在生產環境下的問題。

解決方案就是使用線程鎖。Python的threading模塊提供了線程鎖,線程鎖被一個線程或者沒有線程所擁有。一個線程嘗試著獲取資源上已經被鎖的線程鎖,那個線程會被中止,直到線程鎖被釋放。讓我們來看看一個沒有采用任何鎖機制的實例,

import threading

total = 0

def update_total(amount):
    global total
    total += amount
    print (total)

if __name__ == "__main__":
    for i in range(10):
        my_thread = threading.Thread(target = update_total,args = (5,))
        my_thread.start()

可以調用time.sleep函數讓這段程序更加有意思。這個問題主要還是一個線程調用update_total,在它完成更新之前,另一個線程也可能會去調用它並嘗試著去更新。依賴於操作的順序,這個值可能立刻就會被相加。

讓我們在這個函數中增加一個線程鎖,有兩種方式去實現這個,一種就是使用try/finally,正如我們所希望,線程鎖常常出於釋放狀態。下面就是例子,

import threading

total = 0
lock = threading.Lock()

def update_total(amount):
    global total
    lock.acquire()
    try:
        total += amount
    finally:
        lock.release()
    print ("total = " + str(total) + '\n')

if __name__ == "__main__":
    for i in range(10):
        my_thread = threading.Thread(target = update_total,args = (5,))
        my_thread.start()

這裡,我們在我們處理任何任務之前,先獲取線程鎖。然後,我們嘗試著更新total的值,釋放線程鎖並打印出total當前的值。我們還可以使用Python的with語句來做類似的事情。

import threading

total = 0
lock = threading.Lock()

def update_total(amount):
    global total
    with lock:
        total += amount
    print ("total = " + str(total) + '\n')

if __name__ == "__main__":
    for i in range(10):
        my_thread = threading.Thread(target = update_total,args = (5,))
        my_thread.start()

正如你所看到的,我們不再需要try/finally作為上下文管理器,而是通過with語句完成了這些工作。

有時候,你也需要多個線程獲取多個函數,當你剛開始寫代碼時,你可能處理一些任務,如下,

import threading

total = 0
lock = threading.Lock()

def do_something():
    lock.acquire()
    try:
        print("Lock acquired in the do_something function")
    finally:
        lock.release()
        print("Lock released in the do_something function")
    return "Done doing something"

def do_something_else():
    lock.acquire()
    try:
        print("Lock acquired in the do_something_else function")
    finally:
        lock.release()
        print("Lock released in the do_something_else function")
    return "Finished something else"

if __name__ == "__main__":
    result_one = do_something()
    result_two = do_something_else()

這個或許在當前環境下運行沒有問題,但是假設你右多個線程調用這兩個函數。當一個線程在運行這個函數,另一個線程可能也會修改數據,最終你所得到將會是不正確的結果。而你也許並沒有立刻意識到結果是錯誤的。有沒有什麼解決方案呢?讓我們仔細揣摩揣摩。

第一個常見的想法就是在這兩個函數之前增加一個線程鎖,讓我們將以上的代碼修改如下,

import threading

total = 0
lock = threading.Lock()

def do_something():
    lock.acquire()
    try:
        print("Lock acquired in the do_something function")
    finally:
        lock.release()
        print("Lock released in the do_something function")
    return "Done doing something"

def do_something_else():
    lock.acquire()
    try:
        print("Lock acquired in the do_something_else function")
    finally:
        lock.release()
        print("Lock released in the do_something_else function")
    return "Finished something else"

def main():
    with lock:
        result_one = do_something()
        result_two = do_something_else()
    print(result_one)
    print(result_two)

if __name__ == "__main__":
    main()

當你實際運行這段代碼時,你會發現它出於懸掛狀態而無法終止。原因就是當我們告訴threading模塊去獲取線程鎖時,我們調用第一個函數,它會發現線程鎖已經保持和阻塞,它將會繼續保持直到線程鎖被釋放,但是這種情況永遠不會發生。

解決方法就是使用Re-Entrant Lock。Python的threading模塊通過RLock函數提供了這個功能。將lock = threading.Lock()改為lock = threading.RLock(),再運行一下,你的代碼就會正常運行了。

如果你嘗試在實際的線程中運行以上的代碼,我們可以如下方式調用main函數。

if __name__ == "__main__":
    for i in range(10):
        my_thread = threading.Thread(target = main)
        my_thread.start()

這段代碼將會在每個線程中運行main函數,main函數又會調用另外兩個函數,你將會得到10組輸出。

2.3 定時器

threading模塊有一個精巧的類--Timer,你可以用它表示一個在指定時間之後才發生的操作。我們可以像常規的線程那樣,通過使用start()方法啟動我們的定時器,也可以使用定時器的cancel()方法來終止一個定時器。你應該了解到,你可以在定時器啟動之前終止它。

我遇到這樣一個場景,我需要與啟動的子進程進行通信,但是我還需要它在超時時會終止。有很多種方法來解決這個問題,我喜歡的解決方法是使用threading模塊的Timer類。

我們使用ping這個命令,在Linux中,ping命令會一直運行,直到你殺死它。所以Timer類在Linux平台上就非常有用。示例如下,

import subprocess

from threading import Timer

kill = lambda process:process.kill()

cmd = ["ping","www.baidu.com"]

ping = subprocess.Popen(cmd,stdout = subprocess.PIPE,stderr = subprocess.PIPE)

my_timer = Timer(5,kill,[ping])

try:
    my_timer.start()
    stdout,stderr = ping.communicate()
finally:
    my_timer.cancel()

print(str(stdout))

這裡,我們首先定義一個lambda函數用於殺死進程。然後我們開始ping,並且創建一個Timer對象。你將會注意到第一個變量是等待時間,然後調用函數和傳入相應的變量。在這個示例中,我們的函數就是一個lambda函數,並且我們傳入一個變量列表,這個列表只有一個元素。如果你運行這段代碼,它會運行5秒鐘,然後打印出ping的結果。

2.4 其它的線程組件

threading模塊還支持其它組件。例如,你可以創建一個Semaphore--計算機科學中最經典的同步原語之一。一個Semaphore管理一個內部的計數器,當你調用acquire方法,它會遞減;如果你調用release,它就會遞增。計數器不能低於0。當它為0時,你碰巧調用了acquire,它就會阻塞。

另一個有用的工具就是使用Event。它會允許你在線程之間通過信號通信。我們將會在下一部分展示使用Event的示例。

最後,Python3.2中也有Barrier對象。Barrier管理一個線程池,線程池中的線程需要等待其它線程執行完畢。為了通過屏障,需要調用wait()方法,這個方法可以阻塞直到所有的線程已經被調用。然後,它就會同時釋放所有的線程。

2.5 線程通信

你可能想讓線程之間互相通信。正如我們在上一部分提到的,你可以使用Event,但是更為通用的方法是使用Queue。在我們這個示例中,我們實際上都使用了。讓我們來看看吧,

import threading

from queue import Queue

def creator(data,q):
    print("Creating data and putting it on the queue")
    for item in data:
        evt = threading.Event()
        q.put((item,evt))
        print("Wait for data to be doubled")
        evt.wait()

def my_consumer(q):
    while True:
        data,evt = q.get()
        print("data found to be process:{}".format(data))
        processed = data * 2
        print(processed)
        evt.set()
        q.task_done()

if __name__ == "__main__":
    q = Queue()
    data = [5,10,13,-1]
    thread_one = threading.Thread(target = creator,args = (data,q))
    thread_two = threading.Thread(target = my_consumer,args = (q,))
    thread_one.start()
    thread_two.start()
    q.join()

首先,我們有一個creator(AKA,一個生產者)函數用於創建需要處理的數據。我們還有另一個函數用於處理待處理的數據,我們命名為my_consumer。生產者函數使用Queue的put方法將數據放入Queue中,消費者函數會一直檢查數據,當可以獲得數據,就會處理它。Queue會自己處理所有的獲取和釋放,所以你不需要關注。

在這個例子中,我們首先創建一個用於數值翻倍的列表。然後我們創建兩個線程,一個用於生產者,另一個用於消費者。你將會注意到我們給每個線程都傳入一個Queue對象,每個線程的背後都會有線程所是如何處理的。隊列會將第一個線程的返回數據傳入給第二個線程。當第一個線程將數據放入到隊列中,它也傳入了一個Event對象,然後等待這個事件的結束。在消費者函數中,數據被處理,當它把數據處理完畢,它會調用事件的set方法,這個將會告訴第一個線程,第二個線程已經處理完畢數據,第一個線程可以繼續發送數據。

最後一行代碼調用了Queue對象的join方法,這會告訴Queue去等待所有的線程都執行結束。當第一個線程把所有的元素都放入Queue中之後,它就會結束。

2.6 總結

在本文中,我們講解了很多知識,你可以學習到如下知識:

  • 基本的線程
  • 線程鎖如何工作
  • 什麼是事件,以及如何使用
  • 如何使用定時器
  • 通過Queues/Events,線程之間如何通信

現在,你已經了解到線程是如何使用的,以及它們是如何工作的,我希望你可以在代碼中找到更多關於線程使用的技巧。

3 Reference

Python 201

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved