程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python Parallel Programming Practice (Part 1)

編輯:Python

攜手創作,共同成長!這是我參與「掘金日新計劃 · 8 月更文挑戰」的第4天,點擊查看活動詳情

Python 並行編程實踐(上)

前言

pythonThere is also a lot about parallelism,比如:進程、線程、協程、異步等等,資料也不少.I'm here to talk a little bit about this today.

進程和線程

Both multi-process and multi-threading will be used hereconcurrent.futures庫來實現,是 3.2 version of the appearing features,But I don't believe anyone still uses less than 3.2 版本的 python 吧,不會吧,不會吧

導入

concurrent.futuresThe thread pool and process pool are very similar in use,It looks like just the referenced classes are different,線程池是:ThreadPoolExecutor,進程池是ProcessPoolExecutor.

實例化線程池/進程池

一般有兩種方式,一個是使用withCreated as a context manager,One is the way to create objects directly.If it is used, there is less logic,幾行代碼就寫完了,那就直接使用with上下文管理器,It's convenient and quick, and you don't have to close it yourself,If the logic involved is more complicated,There are many places where you can create your own objects and manage them yourself,根據需要自己選擇

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index):
time.sleep(index)
def main():
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pass
# Create and close manually
pool = ThreadPoolExecutor()
pool.shutdown()
if __name__ == '__main__':
main()
復制代碼

再看看參數,Most of the time it's just one that needs attentionmax_workers,Represents the maximum worker threads/進程的數量,This number can be specified by yourself,There will also be a default value if not specified

  • ThreadPoolExecutor 的默認值為 cpu 的數量加 4,但不會大於 32
  • ProcessPoolExecutor 的默認值為 cpu 的數量,但在 Windows The maximum on the platform will not exceed 61 個

submit 提交任務

submitThe method is a single submission of the task,下面是函數簽名:

def submit(self, fn, /, *args, **kwargs):
pass
復制代碼

fn The parameter is the name of the function to submit,不用加括號,*args, **kwargs是函數的參數,直接寫進去就行,下面看示例

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, index_2=None):
print(f"索引:{index} {index_2}")
time.sleep(2)
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
for i in numbers:
pool.submit(test, i, index_2=i)
if __name__ == '__main__':
main()
# 運行結果
索引:1 1
索引:2 2
索引:3 3
索引:4 4
索引:5 5
索引:6 6
復制代碼

map 提交任務

The above is to submit the tasks one by one,This is a batch submission task,下面是函數簽名:

def map(self, fn, *iterables, timeout=None, chunksize=1):
pass
復制代碼

Watch the first one most of the time、The second parameter will do,The first parameter is the name of the function to call,Also without parentheses,The second is iteratorslist、set、tuple之類的,But this time, this function can only pass one parameter by default, that is, the iteration is the data,如下:

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index):
print(f"索引:{index}")
time.sleep(2)
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pool.map(test, numbers)
if __name__ == '__main__':
main()
復制代碼

But if your function really has several parameters,you have to use map,You can use partial functionsfunctools.partialReconstruct a function,看例子就懂了:

import time
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, arg_2):
print(f"索引:{index} {arg_2}")
time.sleep(2)
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pool.map(functools.partial(test, arg_2=2), numbers)
if __name__ == '__main__':
main()
# 結果
索引:1 2
索引:2 2
索引:3 2
索引:4 2
索引:5 2
索引:6 2
復制代碼

map 獲取結果

The above just submits the task,The task will be executed automatically after submission,先說說map獲取結果的方式,map 會“Returns a list of results directly”,Strictly speaking this is not the case,但可以這麼理解,就是這樣:

import time
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, arg_2):
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = pool.map(functools.partial(test, arg_2=2), numbers)
for result in results:
print(f"結果:{result}")
if __name__ == '__main__':
main()
# 結果
索引:1 2
索引:2 2
索引:3 2
索引:4 2
索引:5 2
索引:6 2
結果:1
結果:2
結果:3
結果:4
結果:5
結果:6
復制代碼

submit 獲取結果

下面在看看 submit 獲取結果的方法,Use the above slightly differently,稍微麻煩一點點,需要用到as_completed來等待完成,Just look at the example to understand:

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def test(index, arg_2):
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = [pool.submit(test, index, 2) for index in numbers]
for result in as_completed(results):
print(f"結果:{result.result()}")
if __name__ == '__main__':
main()
復制代碼

It needs to be used when getting the result.result()方法

捕獲異常

If an exception occurs in a function in the thread pool or process pool,The program is not interrupted,will not be captured directly,比如下面:

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def test(index, arg_2):
raise ValueError("報錯了")
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
pool.map(functools.partial(test, arg_2=2), numbers)
for index in numbers:
pool.submit(test, index, 2)
if __name__ == '__main__':
main()
復制代碼

You will find that the program outputs nothing and is not aborted,But if you try to run the program by capturing the return value,It is found that the program has thrown an exception and was aborted,So catching exceptions is to catch them at this time: map 捕獲異常

import time
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def test(index, arg_2):
raise ValueError("報錯了")
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = pool.map(functools.partial(test, arg_2=2), numbers)
try:
for result in results:
print(f"結果:{result}")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
復制代碼

submit 捕獲異常

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def test(index, arg_2):
raise ValueError("報錯了")
print(f"索引:{index} {arg_2}")
time.sleep(2)
return index
def main():
numbers = [1, 2, 3, 4, 5, 6]
# 上下文管理器,會自動關閉
with ProcessPoolExecutor() as pool:
results = [pool.submit(test, index, 2) for index in numbers]
for result in as_completed(results):
try:
print(f"結果:{result.result()}")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
復制代碼

It can be seen that the difference is,map Catching exceptions can only be done outside the loop,但 submit Both inside and out.

差不多了

如題,That's it for basic usage,Of course, there is still a lot of knowledge in this library,I'm just taking everyone through the door,For more in-depth information, you can go to the relevant documentation to understand:

  • 官方文檔
  • Python Cookbook
  • python-parallel-programmning-cookbook

下節繼續

This section is here first,The next section discusses the rest.


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved