程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Using Python to quickly implement a thread pool, very simple

編輯:Python


雷猴啊,兄弟們!To demonstrate how to use todayPythonQuickly implement a thread pool.

python實現一個線程池

    • 一、序言
    • 二、正文
      • 1、Future 對象
      • 2、Submit function to automatically create Future 對象
      • 3、future.set_result 到底干了什麼事情
      • 4、Submit multiple function
      • 5、使用 map To submit multiple function
      • 6、According to the order for waiting
      • 7、Cancel the execution of a function
      • 8、Function performs the abnormal
      • 9、Wait for all function is done
    • 三、小結

一、序言

當有多個 IO Intensive task to be processed,We naturally think of multithreading.But if the task is very much,We can't every task starts a thread to deal with,This time the best way is to implement a thread pool,As for the number of threads in the pool can be set according to the business scenario.

For example, we implement a 10 個線程的線程池,So can concurrently processing 10 個任務,Each thread will after task execution,Then go to the next mission.通過使用線程池,To avoid resource depletion caused by the thread to create too much,And task execution at the life cycle of also can well meet the.

And the thread pool implementation is very simple,But here we do not intend to manual implementation,因為 Python In the standard library provides a concurrent.futures,Has built-in support for thread pool.所以本篇文章,We have to introduce in detail the module usage.

二、正文

1、Future 對象

When we submit to the thread pool when a function,Allocates a thread to execute,At the same time immediately return a Future 對象.通過 Future Objects can monitor function execution status,Do you have any abnormal,And have completed, and so on.如果函數執行完畢,Internal will call future.set_result 將返回值設置到 future 裡面,And then the outside world can call future.result 拿到返回值.

除此之外 future 還可以綁定回調,一旦函數執行完畢,就會以 future 為參數,自動觸發回調.所以 future Referred to as the object in the future,Can be understood as a function of a container,When the thread pool, we submit a function,Will immediately create the corresponding future 然後返回.Function of the execution status of what,都通過 future 來查看,Of course also can bind a callback to it,In the automatic trigger function has been completed.

那麼下面我們就來看一下 future 的用法,Text word understand may be a bit boring.

The function be submitted to the thread pool inside the runtime,Will immediately return an object
這個對象就叫做 Future 對象,Contains the function execution status, etc
Of course, we can also manually create aFuture對象.

from concurrent.futures import Future
# 創建 Future 對象 future
future = Future()
# 給 future 綁定回調
def callback(f: Future):
print("當set_result的時候會執行回調,result:",
f.result())
future.add_done_callback(callback)
# 通過 add_done_callback Method can give future 綁定回調
# Will automatically call future 作為參數
# 如果需要多個參數,Then use the partial function
# When the callback function to perform?
# 顯然是當 future 執行 set_result 的時候
# 如果 future Is submitted to the thread pool when returns
# So when the function is performed automatically perform future.set_result(xx)
# And will return to set their own into it
# 而這裡的 future 是我們手動創建的,So you need to manually perform
future.set_result("嘿嘿")

當set_result的時候會執行回調,result: 嘿嘿

需要注意的是:只能執行一次 set_result,但是可以多次調用 result 獲取結果.

from concurrent.futures import Future
future = Future()
future.set_result("哼哼")
print(future.result()) # 哼哼
print(future.result()) # 哼哼
print(future.result()) # 哼哼

執行 future.result() 之前一定要先 set_result,Otherwise would have been in the blocking state.當然 result 方法還可以接收一個 timeout 參數,表示超時時間,If didn't get to within a specified time value will throw an exception.

2、Submit function to automatically create Future 對象

We are manually create above Future 對象,But rarely work manually create.We will function operates when submitted to the thread pool,會自動創建 Future 對象並返回.這個 Future Inside the object contains the function execution status,Such as is on hold at this time、Running or completed, etc,And function in the execution after,還會調用 future.set_result Their return values set in.

from concurrent.futures import ThreadPoolExecutor
import time
def task(name, n):
time.sleep(n)
return f"{
name} 睡了 {
n} 秒"
# 創建一個線程池
# It can also specify max_workers 參數,Said how many threads up to create
# Python學習交流裙279199867
# 如果不指定,So each submit a function,Will create a thread for it
executor = ThreadPoolExecutor()
# 通過 submit The function can be submitted to the thread pool,一旦提交,Immediately run
# Because open a new thread,主線程會繼續往下執行
# 至於 submit 的參數,According to the function name,Submit corresponding parameters
# Remember that not writtentask("古明地覺", 3),So they became call
future = executor.submit(task, "屏幕前的你", 3)
# Due to the function inside there time.sleep,並且指定的 n 是 3
# So the function inside will sleep 3 秒,Obviously in the running state at this time
print(future)
""" <Future at 0x7fbf701726d0 state=running> """
# 我們說 future 相當於一個容器,Contains the internal function execution status
# Function is running
print(future.running())
""" True """
# 函數是否執行完畢
print(future.done())
""" False """
# 主程序也 sleep 3 秒
time.sleep(3)
# Obviously function has been completed at this time
# And print the result also told us that the return value type is str
print(future)
""" <Future at 0x7fbf701726d0 state=finished returned str> """
print(future.running())
""" False """
print(future.done())
""" True """
# 函數執行完畢時,Will set the return value in future 裡
# 也就是說一旦執行了 future.set_result
# Then it will function performs finished,And then the outside world can call result 拿到返回值
print(future.result())
""" 屏幕前的你 睡了 3 秒 """

這裡再強調一下 future.result(),This step will be blocked,舉個例子:

# 提交函數
future = executor.submit(task, "屏幕前的你", 3)
start = time.perf_counter()
future.result()
end = time.perf_counter()
print(end - start) # 3.00331525

可以看到,future.result() It took nearly 3s.其實也不難理解,future.result() 是干嘛的?In order to retrieve the return value of a function,Can function is not complete,They can get it from?So can only wait and function is done,將返回值通過 set_result 設置到 future 裡面之後,The outside world to call future.result() 獲取到值.

If you don't want to have been waiting for,So when get the value can be introduced into a timeout time.

from concurrent.futures import (
ThreadPoolExecutor,
TimeoutError
)
import time
def task(name, n):
time.sleep(n)
return f"{
name} 睡了 {
n} 秒"
executor = ThreadPoolExecutor()
future = executor.submit(task, "屏幕前的你", 3)
try:
# 1 Seconds to get less than the value,拋出 TimeoutError
res = future.result(1)
except TimeoutError:
pass
# 再 sleep 2 秒,Obviously the function has been completed
time.sleep(2)
# 獲取返回值
print(future.result())
""" 屏幕前的你 睡了 3 秒 """

當然啦,To do so is smart enough,Because we don't know when the function is done.So the best way or bind a callback,當函數執行完畢時,自動觸發回調.

from concurrent.futures import ThreadPoolExecutor
import time
def task(name, n):
time.sleep(n)
return f"{
name} 睡了 {
n} 秒"
def callback(f):
print(f.result())
executor = ThreadPoolExecutor()
future = executor.submit(task, "屏幕前的你", 3)
# 綁定回調,3 Seconds later the automatic call
future.add_done_callback(callback)
""" 屏幕前的你 睡了 3 秒 """

需要注意的是,在調用 submit 方法之後,Submitted to the function of the thread pool has been carried out.Regardless of the function have completed,We can give the corresponding future 綁定回調.

If the function completed before add the callback,That would trigger at the completion of a function callback.If the function completed add correction,Because the function is complete,代表此時的 future 已經有值了,Or have set_result 了,Then would immediately trigger the callback.

3、future.set_result 到底干了什麼事情

當函數執行完畢之後,會執行 set_result,So what exactly do it this way?

我們看到 future There are two protected properties,分別是 _result 和 _state.顯然 _result 用於保存函數的返回值,而 future.result() Nature is also return _result 屬性的值.而 _state Attributes are used to represent the execution status of function,初始為 PENDING,The execution of for RUNING,When is done is set to FINISHED.

調用 future.result() 的時候,會判斷 _state 的屬性,If you still in execution has been waiting for.當 _state 為 FINISHED 的時候,就返回 _result 屬性的值.

4、Submit multiple function

We only submit the above each time a function,But in fact can submit any number of,我們來看一下:

from concurrent.futures import ThreadPoolExecutor
import time
def task(name, n):
time.sleep(n)
return f"{
name} 睡了 {
n} 秒"
executor = ThreadPoolExecutor()
futures = [executor.submit(task, "屏幕前的你", 3),
executor.submit(task, "屏幕前的你", 4),
executor.submit(task, "屏幕前的你", 1)]
# At this time are inrunning
print(futures)
""" [<Future at 0x1b5ff622550 state=running>, <Future at 0x1b5ff63ca60 state=running>, <Future at 0x1b5ff63cdf0 state=running>] """
time.sleep(3)
# 主程序 sleep 3s 後
# futures[0]和futures[2]處於 finished
# futures[1]仍處於 running
print(futures)
""" [<Future at 0x1b5ff622550 state=running>, <Future at 0x1b5ff63ca60 state=running>, <Future at 0x1b5ff63cdf0 state=finished returned str>] """

如果是多個函數,How to get a return value?很簡單,遍歷 futures 即可.

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "屏幕前的你", 5),
executor.submit(task, "屏幕前的你", 2),
executor.submit(task, "屏幕前的你", 4),
executor.submit(task, "屏幕前的你", 3),
executor.submit(task, "屏幕前的你", 6)]
for future in futures:
print(future.result())
""" 屏幕前的你 睡了 5 秒 屏幕前的你 睡了 2 秒 屏幕前的你 睡了 4 秒 屏幕前的你 睡了 3 秒 屏幕前的你 睡了 6 秒 """

There are places to say,首先 futures 裡面有 5 個 future,記做 future1, future2, future3, future4, future5.

當使用 for 循環遍歷的時候,Actually, in turn, traverse the 5 個 future,So the order of the return value is that we add the function of order.由於 future1 The corresponding functions have been dormant for 5s,那麼必須等到 5s 後,future1 It just can have value.

But this is a five function concurrent execution of,future2, future3, future4 Because only sleep 2s, 4s, 3s,So would be completed before,然後執行 set_result,Returns the value set to the corresponding future 裡.

但 Python 的 for Cycle can't be in the first iteration is not over yet,To execute the second iteration.因為 futures 裡面的幾個 future The order has been set the start,只有當第一個 future.result() 執行完成之後,才會執行第二個 future.result(),以及第三個、第四個.

So even if the back of the function has been completed,但由於 for 循環的順序,Can only wait for,直到前面的 future.result() 執行完畢.所以當第一個 future.result() 結束時,後面三個 future.result() 會立刻輸出,Because of their internal function have been executed over.

而最後一個 future,由於內部函數 sleep 了 6 秒,So to wait 1 秒,才會打印 future.result().

5、使用 map To submit multiple function

使用 submit Submit function returns a future,And you can also give future Bind a callback.But if you don't care about the callback,那麼還可以使用 map 進行提交.

executor = ThreadPoolExecutor()
# map 內部也是使用了 submit
results = executor.map(task,
["屏幕前的你"] * 3,
[3, 1, 2])
# And returns the iterator
print(results)
""" <generator object ... at 0x0000022D78EFA970> """
# Traverse was is no longer at this time future
# 而是 future.result()
for result in results:
print(result)
""" 屏幕前的你 睡了 3 秒 屏幕前的你 睡了 1 秒 屏幕前的你 睡了 2 秒 """

可以看到,當使用for循環的時候,map 執行的邏輯和 submit 是一樣的.唯一的區別是,Don't need to call at this time result 了,Because the return is the function return value.

Or we call directly list 也行.

executor = ThreadPoolExecutor()
results = executor.map(task,
["屏幕前的你"] * 3,
[3, 1, 2])
print(list(results))
""" ['屏幕前的你 睡了 3 秒', '屏幕前的你 睡了 1 秒', '屏幕前的你 睡了 2 秒'] """

results 是一個生成器,調用 list When will the inside of the output value of all.由於 map 內部還是使用的 submit,然後通過 future.result() 拿到返回值,The longest function requires 3 秒,Therefore this step will block 3 秒.3 秒過後,Prints all the function return value.

6、According to the order for waiting

To obtain the return value above,Is submitted in accordance with the function of order for.If I want what function is performed first,Which function to obtain the return value of the,該怎麼做呢?

from concurrent.futures import (
ThreadPoolExecutor,
as_completed
)
import time
def task(name, n):
time.sleep(n)
return f"{
name} 睡了 {
n} 秒"
executor = ThreadPoolExecutor()
futures = [executor.submit(task, "屏幕前的你", 5),
executor.submit(task, "屏幕前的你", 2),
executor.submit(task, "屏幕前的你", 1),
executor.submit(task, "屏幕前的你", 3),
executor.submit(task, "屏幕前的你", 4)]
for future in as_completed(futures):
print(future.result())
""" 屏幕前的你 睡了 1 秒 屏幕前的你 睡了 2 秒 屏幕前的你 睡了 3 秒 屏幕前的你 睡了 4 秒 屏幕前的你 睡了 5 秒 """

Who first complete at this time,誰先返回.

7、Cancel the execution of a function

我們通過 submit Function can be submitted to the thread pool to perform,But if we want to cancel to do?

executor = ThreadPoolExecutor()
future1 = executor.submit(task, "屏幕前的你", 1)
future2 = executor.submit(task, "屏幕前的你", 2)
future3 = executor.submit(task, "屏幕前的你", 3)
# 取消函數的執行
# 會將 future 的 _state 屬性設置為 CANCELLED
future3.cancel()
# See if cancelled
print(future3.cancelled()) # False

問題來了,調用 cancelled 方法的時候,返回的是False,這是為什麼?很簡單,Because the function has been submitted to the thread pool inside,Function has been running for.Only when he has not run and,Cancellation will only be successful.

But this is not contradictory?Function once submitted will run,Only don't run will cancel the success,這怎麼辦?Remember the thread pool is called max_workers 的參數嗎?Used to control the number of threads within the thread pool,We can set the maximum number of threads to2,So when the third function in,就不會執行了,But in a suspended state.

executor = ThreadPoolExecutor(max_workers=2)
future1 = executor.submit(task, "屏幕前的你", 1)
future2 = executor.submit(task, "屏幕前的你", 2)
future3 = executor.submit(task, "屏幕前的你", 3)
# If can create threads in the pool
# This function once submitted will run,狀態為 RUNNING
print(future1._state) # RUNNING
print(future2._state) # RUNNING
# 但 future3 Internal function has not yet run
# Because of unable to create new threads in the pool,所以狀態為 PENDING
print(future3._state) # PENDING
# 取消函數的執行,If the function is not running
# 會將 future 的 _state 屬性設置為 CANCELLED
future3.cancel()
# See if cancelled
print(future3.cancelled()) # True
print(future3._state) # CANCELLED

At the time of starting a thread pool,Is certainly need to set the capacity of,Otherwise deal with thousands of function to open thousands of threads.And when the function is cancelled,Can not call again future.result() 了,否則的話會拋出 CancelledError.

8、Function performs the abnormal

In front of us logic are function under the premise of normal execution,但天有不測風雲,If a function is executed abnormal what to do?

from concurrent.futures import ThreadPoolExecutor
def task1():
1 / 0
def task2():
pass
executor = ThreadPoolExecutor(max_workers=2)
future1 = executor.submit(task1)
future2 = executor.submit(task2)
print(future1)
print(future2)
""" <Future at 0x7fe3e00f9e50 state=finished raised ZeroDivisionError> <Future at 0x7fe3e00f9eb0 state=finished returned NoneType> """
# 結果顯示 task1 Function abnormal
# So how this exception to get?
print(future1.exception())
print(future1.exception().__class__)
""" division by zero <class 'ZeroDivisionError'> """
# If no exception,那麼 exception 方法返回 None
print(future2.exception()) # None
# 注意:If the function is abnormal
# 那麼調用 result 方法會將異常拋出來
future1.result()
""" Traceback (most recent call last): File "...", line 4, in task1 1 / 0 ZeroDivisionError: division by zero """

出現異常時,調用 future.set_exception 將異常設置到 future 裡面,而 future 有一個 _exception 屬性,Special exception save Settings.當調用 future.exception() 時,也會直接返回 _exception 屬性的值.

9、Wait for all function is done

Suppose that we went to the thread pool to submit a number of functions,If you want to submit the function is executed after,The main program to execute down,該怎麼辦呢?其實方案有很多:

第一種:

from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return f"sleep {
n}"
executor = ThreadPoolExecutor()
future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
# Here is not blocked
print("start")
# 遍歷所有的 future,並調用其 result 方法
# This will wait until after all the function has been completed will go down
for future in [future1, future2, future3]:
print(future.result())
print("end")
""" start sleep 5 sleep 2 sleep 4 end """

第二種:

from concurrent.futures import (
ThreadPoolExecutor,
wait
)
import time
def task(n):
time.sleep(n)
return f"sleep {
n}"
executor = ThreadPoolExecutor()
future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
# return_when 有三個可選參數
# FIRST_COMPLETED:When any task is complete or cancel
# FIRST_EXCEPTION:When any task is abnormal
# If haven't exception is equivalent toALL_COMPLETED
# ALL_COMPLETED:所有任務都完成,默認是這個值
fs = wait([future1, future2, future3],
return_when="ALL_COMPLETED")
# 此時返回的fs是DoneAndNotDoneFutures類型的namedtuple
# 裡面有兩個值,一個是done,一個是not_done
print(fs.done)
""" {<Future at 0x1df1400 state=finished returned str>, <Future at 0x2f08e48 state=finished returned str>, <Future at 0x9f7bf60 state=finished returned str>} """
print(fs.not_done)
""" set() """
for f in fs.done:
print(f.result())
""" start sleep 5 sleep 2 sleep 4 end """

第三種:

# 使用上下文管理
with ThreadPoolExecutor() as executor:
future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
# 所有函數執行完畢(with語句結束)Down to perform

第四種:

executor = ThreadPoolExecutor()
future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
# After all the function,才會往下執行
executor.shutdown()

三、小結

If we need to start the threads to execute function,So might as well use the thread pool.Every call a function from the inside out a thread pool,Thread will be back in the pool function has been completed in order to perform other functions.If the pool is empty inside,Or can't create new threads,The following functions can either in a wait state.

最後,concurrent.futures Not only can be used to implement the thread pool,Can also be used to implement process pool.兩者的 API 是一樣的:

from concurrent.futures import ProcessPoolExecutor
import time
def task(n):
time.sleep(n)
return f"sleep {
n}"
executor = ProcessPoolExecutor()
# Windows Need to add on the line
if __name__ == '__main__':
future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
executor.shutdown()
print(future1.result())
print(future2.result())
print(future3.result())
""" sleep 5 sleep 2 sleep 4 """

The thread pool and processes the pool API 是一致的,But rarely work to create large pools of process.

Let's call it a brothers sharing today,債見!


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved