程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python3 tutorial: a method of executing multi process tasks with concurrent

編輯:Python

With the development of computer technology , Such as GPU And supercomputing platforms are becoming more and more developed , The nature of these technologies has not brought about algorithmic innovation , The reason why it can improve the speed and scale of Computing , Largely because of the advantages of distributed and parallel computing . Here we introduce a simple python Self contained multi process code implementation , It uses concurrent This tool , At the same time, we will also introduce how to better configure the resources of multiple processes .

concurrent Examples of use

concurrent yes python A self-contained multi process implementation warehouse , No additional installation is required . Here we first introduce a sample code without multiple processes :

import time
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
times = [1] * 10
time0 = time.time()
for t in times:
sleep(t)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

The function of this code is actually to sleep 10s Time for , It's easier , We can take a look at the execution result of this code :

[[email protected]-manjaro concurrent]$ python3 sleep.py
The time cost is: 10.014754295349121s

When we count the time , Found that 10s More time , This part of the time is not just about this python The execution time of the program , It is also related to the statistical method of time , Generally, we can ignore this part gap Time .

We assume that in the above program sleep Function function is not sleep 1s Time for , Instead, execute a time-consuming 1s Calculation task of , And we have a lot of cpu, We hope to speed up the calculation process , At this time, we need to use multi process technology , Here is the code after changing to multi process :

import concurrent.futures
import time
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
times = [1] * 10
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(sleep, times)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

The whole modification method is also very easy , Is to put the for Change the loop to concurrent The execution statement of , Let's take a look at the results of the implementation :

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py
The time cost is: 2.0304219722747803s

From the results we can see that , The running time is from 10s Down to 2s. This shows that our multi process task has brought a very significant optimization effect , As for why after optimization 2s instead of 3s perhaps 1s, This problem will be introduced in the next chapter .

Optimal configuration for multiple processes

Use a multi process solution , How much acceleration effect can it have , It is largely hardware dependent . In theory , If there is n individual cpu nucleus , We can do it n Double acceleration . But in most cases, it will be limited by the algorithm or the task itself , Follow n There is a certain... Between times the acceleration gap. First let's use ipython Let's see how many are on the local computer cpu:

[[email protected]-manjaro concurrent]$ ipython
Python 3.8.5 (default, Sep 4 2020, 07:30:14)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.19.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: import psutil
In [2]: psutil.cpu_count(logical=False)
Out[2]: 4
In [3]: psutil.cpu_count(logical=True)
Out[3]: 8

Here we are using psutil This library , Not the usual os perhaps multiprocessing, This is because it can better distinguish between logical cores and physical cores . Our local computer has 4 A physical nucleus , Each physical core actually corresponds to 2 A logic core , So there are 8 A logic core . in other words , In theory, the best we can do is accelerate 8 Times the algorithm execution time . Let's test and verify by configuring and modifying some parameters :

import concurrent.futures
import time
import sys
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
if sys.argv[1] == '-t':
times = [1] * int(sys.argv[2]) # Get the time input parameter of the command line 
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(sleep, times)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

In this example , For convenience of adjustment , We set the total sleep time as the command line input parameter , It uses sys.argv This function to get , Note that the obtained parameters are in string format . The execution method and result of this string of code are as follows :

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The time cost is: 2.0304934978485107s

In the execution results above , We found that we needed 16s Sleep task for , In the multi process scenario, it is accelerated to 2s, Just in line with our expectation of the acceleration times of the logical kernel . But as mentioned earlier , Can we speed up the multiples of the logic kernel , It also has something to do with the algorithm of the task itself . For example, in this use case , If the result of the algorithm allocation is the need 17 Sub algorithm , Then, on each logical core 2 After the second sleep task , Another logical core needs to be executed 1 Secondary sleep task , At this time, other logical cores need to wait for the task of the logical core executing the task to end . The specific implementation results are as follows :

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 17
The time cost is: 3.0313029289245605s

This result also verifies our previous expectation , because 16s Task execution of requires 2s Time for , And the execution is over 16s After , We need to wait for the other one 1s The task with a long duration has been completed , Total time 3s. We have no configuration here max_worker Under the circumstances , Multi process task allocation will be carried out according to the highest number of logical cores in the system , But in the actual scenario, we need to consider the limitations of many factors , Such as the balanced configuration of memory and number of processes ( In large memory tasks , If the process is fully open , It may cause the problem of insufficient memory ). Only if all system constraints are met , To give full play to the best performance of the hardware . In the following code, we will give a scheme of how to configure the number of cores to perform tasks :

import concurrent.futures
import time
import sys
def sleep(seconds):
time.sleep(seconds)
if __name__ == '__main__':
if sys.argv[1] == '-t':
times = [1] * int(sys.argv[2])
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
executor.map(sleep, times)
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

The configuration method is also easy , Is in the ProcessPoolExecutor In the input parameters of the function max_worker, Here we first set the maximum number of cores to 4, Let's take another look at the execution results of the above use cases :

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The time cost is: 4.032958030700684s
[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 17
The time cost is: 5.032677173614502s

about 16s The task of , Because it's on 4 Parallel of cores , So in 4s Completed the task within the time . and 17s The task of , It also needs to wait 1s Time for , The total time is 5s.

Get the return value

If tasks can be performed independently of each other , There is no need to communicate with each other , That would be the ideal situation . But more often than not , We want to collect the return value of each process , Through this return value , Communicate between processes . And in the concurrent Under the circumstances ,map The return value of the function is directly a list of the return values of all processes , This makes it easier for us to carry out our tasks .

''' No one answers the problems encountered in learning ? Xiaobian created a Python Exchange of learning QQ Group :857662006 Looking for small partners who share the same aspiration , Help each other , There are also good video tutorials and PDF e-book ! '''
import concurrent.futures
import time
import sys
def sleep(seconds):
time.sleep(seconds)
return seconds
if __name__ == '__main__':
if sys.argv[1] == '-t':
times = [1] * int(sys.argv[2])
time0 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(sleep, times)
print ('The total sleep cpu time is: {}s'.format(sum(results)))
time1 = time.time()
print ('The time cost is: {}s'.format(time1 - time0))

In this use case , We will map The result of the function is stored in results In this parameter , Finally, results The operation of summing , In this simple example , The result returned is actually the total input sleep time . The results are shown below :

[[email protected]-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The total sleep cpu time is: 16s
The time cost is: 4.034112930297852s

You can see , All return values are successfully obtained .

summary

Multi process technology is an optimization technology independent of the algorithm task itself , adopt python Medium concurrent library , We can easily implement multi process tasks , Used to optimize existing algorithms . Here we also give some reference schemes for multi process configuration information , stay GPU In areas related to supercomputing , Can play a greater role .


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved