程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python3教程:用concurrent執行多進程任務的方法

編輯:Python

隨著計算機技術的發展,諸如GPU和超算平台等越來越發達,這些技術的本質其實並沒有帶來算法上的革新,之所以能夠提升計算的速度和規模,很大程度上是因為分布式和並行計算的優勢。這裡我們介紹一個簡單的python自帶的多進程的代碼實現,使用的是concurrent這個工具,同時我們也會介紹如何更好的配置多進程的資源。

concurrent使用示例

concurrent是python自帶的一個多進程實現倉庫,不需要額外的安裝。這裡我們先介紹一個沒有多進程的示例代碼:

import time
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
times = [1] * 10
time0 = time.time()
for t in times:
sleep(t)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

這個代碼的功能其實就是休眠10s的時間,也比較容易,我們可以看一下這個代碼的執行結果:

[[email protected]-manjaro concurrent]$ python3 sleep.py
The time cost is: 10.014754295349121s

在我們統計時間的時候,發現會比10s的時間多出來一些,這部分時間不僅僅包含這個python程序執行的時間,跟時間的統計方式也有一定的關系,一般我們可以忽略這部分的gap時間。

我們假定上面這個程序中的sleep函數功能不是休眠1s的時間,而是執行一個耗時為1s的計算任務,而我們有很多的cpu,希望能夠加速這個計算的過程,這個時候我們就需要用到多進程的技術,下面是修改為多進程之後的代碼:

import concurrent.futures
import time
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
times = [1] * 10
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(sleep, times)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

整個修改的方式也非常容易,就是把原代碼中的for循環修改為concurrent的執行語句,讓我們一起來看看執行的結果:

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py
The time cost is: 2.0304219722747803s

從結果上我們可以看到,運行的時間從10s降低到了2s。這說明我們的多進程任務帶來了非常顯著的優化效果,至於為什麼優化之後是2s而不是3s或者1s,這個問題將在下一個章節中進行介紹。

多進程的最佳配置

使用多進程的方案,能有多大的加速效果,很大程度上是依賴於硬件的。理論上來說,如果有n個cpu核,我們就可以實現n倍的加速。但是大部分情況下會受限於算法或者任務本身,跟n倍的加速之間有一定的gap。首先讓我們用ipython來看看本地電腦上有多少個cpu:

[[email protected]-manjaro concurrent]$ ipython
Python 3.8.5 (default, Sep 4 2020, 07:30:14)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.19.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: import psutil
In [2]: psutil.cpu_count(logical=False)
Out[2]: 4
In [3]: psutil.cpu_count(logical=True)
Out[3]: 8

這裡我們使用的是psutil這個庫,而不是常用的os或者multiprocessing,這是因為可以更好的區分邏輯核與物理核。我們本地的電腦上有4個物理核,每個物理核實際上對應於2個邏輯核,因此一共是有8個邏輯核。也就是說,理論上我們最大可以加速8倍的算法執行時間。讓我們通過配置和修改一些參數來測試驗證一下:

import concurrent.futures
import time
import sys
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
if sys.argv[1] == '-t':
times = [1] * int(sys.argv[2]) # 獲取命令行的時間輸入參數
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(sleep, times)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

在這個示例中,為了方便調整,我們把總的休眠時間定為命令行的輸入參數,使用的是sys.argv這個函數來獲取,注意獲取到的參數是字符串格式的。這串代碼的執行方法和執行結果如下所示:

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The time cost is: 2.0304934978485107s

在上面的執行結果中,我們發現原本需要16s的休眠任務,在多進程場景下被加速到了2s,剛好符合我們對邏輯核的加速倍數的預期。但是前面提到,能否加速邏輯核的倍數,跟任務本身的算法也有關系。比如在本用例中,如果算法分配的結果就是需要17個子算法來進行實現,那麼在每個邏輯核上面執行了2次的休眠任務之後,又有一個邏輯核需要再執行1次休眠任務,而此時其他的邏輯核都需要等待這個執行任務的邏輯核的任務結束。具體的執行結果如下所示:

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 17
The time cost is: 3.0313029289245605s

這個結果也驗證了我們此前的預想,因為16s的任務執行需要2s的時間,而執行完16s的任務之後,又需要等待剩下的一個1s時長的任務執行完畢,總耗時3s。這裡我們沒有配置max_worker的情況下,會按照系統中最高的邏輯核數來進行多進程的任務分配,但是在實際場景中我們需要考慮多種因素的限制,如內存和進程數的均衡配置(在大內存任務中,如果進程全開,有可能導致內存不足的問題)。只有在滿足所有系統約束條件的情況下,才能發揮硬件最好的性能。在下面的代碼中我們將給出如何配置執行任務的核數的方案:

import concurrent.futures
import time
import sys
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
if sys.argv[1] == '-t':
times = [1] * int(sys.argv[2])
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
executor.map(sleep, times)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

配置方法也很容易,就是在ProcessPoolExecutor函數的入參中引入max_worker,這裡我們先將最大使用的核數設置為4,再來重新看一下上述用例的執行結果:

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The time cost is: 4.032958030700684s
[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 17
The time cost is: 5.032677173614502s

對於16s的任務,因為開了4個核的並行,因此在4s的時間內完成了任務。而17s的任務,同樣是需要多等待1s的時間,總耗時為5s。

獲取返回值

如果任務可以互相獨立的去執行,互相之間並不需要通信,那自然是最理想的情況。但是更多的情況下,我們是要收集各個進程的返回值的,通過這個返回值,在各個進程之間進行通信。而在concurrent的情況下,map函數的返回值直接就是所有進程的返回值所組成的列表,這更加方便了我們的任務執行。

''' 學習中遇到問題沒人解答?小編創建了一個Python學習交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群裡還有不錯的視頻學習教程和PDF電子書! '''
import concurrent.futures
import time
import sys
def sleep(seconds):
time.sleep(seconds)
return seconds
if __name__ == '__main__':
if sys.argv[1] == '-t':
times = [1] * int(sys.argv[2])
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(sleep, times)
print ('The total sleep cpu time is: {}s'.format(sum(results)))
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

在這個用例中,我們將map函數的結果存儲到results這一參數中,最後對results進行求和的操作,這個簡單的示例中,返回的結果實際上就是總的輸入的休眠時間。執行結果如下所示:

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The total sleep cpu time is: 16s
The time cost is: 4.034112930297852s

可以看到,所有的返回值被成功的獲取到。

總結

多進程技術是獨立於算法任務本身的一種優化技術,通過python中的concurrent庫,我們可以非常容易的實現多進程的任務,用來優化已有的算法。這裡我們也給出了一些多進程配置信息的參考方案,在GPU和超算相關的領域上,都能夠發揮較大的用途。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved