程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python multiprocessing

編輯:Python

Python Multi process (Multiprocessing)

For personal study only
originate mofi Python:https://mofanpy.com/tutorials/python-basic/basic/ Invasion and deletion

What is? Multiprocessing and threading Comparison

Multi process Multiprocessing And multithreading threading similar , They are all in python Used for parallel operations in . But now that there is threading, Why? Python And one more multiprocessing Well ? The reason is simple , Just to make up for threading Some of the disadvantages of , For example threading Medium GIL.

Use multiprocessing It's very simple , If the threading Have a certain understanding of friends , It's time for you to enjoy . because python hold multiprocessing and threading The method of using is almost the same . This will make it easier for us to get started . It is also easier to play the power of your computer multi-core system !

Add process Process

import multiprocessing as mp
import threading as td
def job(a,d):
print('aaaaa')
# Thread and Process All initials should be capitalized , The called function has no parentheses , The parameters of the called function are placed in args(...) in 
t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()
# Threads and processes are used in a similar way 

Store process output Queue

Queue Its function is to put the results of each core or thread in the team , Wait until each thread or core has finished running before taking the results from the queue , Continue loading operations . The reason is simple , A function called by multiple threads cannot have a return value , So use Queue Store the results of multiple thread operations

import multiprocessing as mp
def job(q):
res=0
for i in range(1000):
res+=i+i**2+i**3
q.put(res) #queue
if __name__=='__main__':
q = mp.Queue()
p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1+res2)

Efficiency comparison threading & multiprocessing

Ordinary / Multithreading / The running time of multiple processes is 1.13,1.3 and 0.64 second . We found multicore / Multi process is the fastest , It indicates that multiple tasks are running at the same time . The running time of multithreading is even slower than that of a program that does nothing , It shows that multithreading still has some shortcomings .GIL

Multi process < Ordinary < Multithreading

The process of pool Pool

The process of pool Pool. The process pool is what we're going to run , Put it in the pool ,Python Will solve multi process problems by itself

The process of pool Pool() and map()

With the pool , You can make the pool correspond to a function , We threw data into the pool , The pool will return the value returned by the function . Pool And before Process The difference is thrown to Pool The function of has a return value , and Process No return value for .

Next use map() To get the results , stay map() Function and value to be iterated in , It is then automatically assigned to CPU nucleus , Return results

import multiprocessing as mp
def job(x):
return x*x
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
if __name__ == '__main__':
multicore()

Custom number of cores

How do we know Pool Is it true that multiple cores have been called ? We can increase the number of iterations , Then open the CPU Look at the load CPU Operation of the

open CPU load (Mac): Activity monitor > CPU > CPU load ( Just click )

Pool The default size is CPU The number of nuclear , We can also go through the Pool In the middle of processes Parameter to customize the required number of cores ,

def multicore():
pool = mp.Pool(processes=3) # Definition CPU The number of cores is 3
res = pool.map(job, range(10))
print(res)

apply_async()

Pool except map() Outside , There are also ways to return results , That's it apply_async().

apply_async() Only one value can be passed in , It will only put in one core for operation , But when you pass in a value, be careful that it is iterative , So you need to add a comma after passing in the value , At the same time, we need to use get() Method gets the return value

def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# use get Get the results 
print(res.get())
# result 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
4 # apply_async()

use apply_async() Output multiple results

apply_async() Only one set of parameters can be entered .

Here we will apply_async() Put it in the iterator , Define a new multi_res

def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# use get Get the results 
print(res.get())
# iterator ,i=0 when apply once ,i=1 when apply Once, wait 
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# Remove from the iterator 
print([res.get() for res in multi_res])

summary

  1. Pool The default call is CPU The number of nuclear , Pass in processes Parameters can be customized CPU Check the number
  2. map() Put the iteration parameters , Return multiple results
  3. apply_async() Only one set of parameters can be put , And return a result , If you want map() The effect needs to be iterated

Shared memory shared memory

Shared Value

We can do that by using Value The data is stored in a shared memory table .

import multiprocessing as mp
# among d and i Parameter is used to set the data type ,d Represents a double precision floating point type ,i Represents a signed integer .
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

Shared Array

stay Python Of mutiprocessing in , Yes, there is another one Array class , Can interact with shared memory , To share data between processes .

there Array and numpy Different in , It can only be one-dimensional , It can't be multidimensional . Same as Value equally , You need to define the data form , Otherwise, an error will be reported .

array = mp.Array('i', [1, 2, 3, 4])
# Wrong form 
array = mp.Array('i', [[1, 2], [3, 4]]) # 2 dimension list
""" TypeError: an integer is required """

Reference data type

| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |

( source :https://docs.python.org/3/library/array.html)

Process lock Lock

No process lock

import multiprocessing as mp
import time
def job(v, num):
for _ in range(5):
time.sleep(0.1) # Pause 0.1 second , Make the output effect more obvious 
v.value += num # v.value Get shared variable values 
print(v.value, end="")
def multicore():
v = mp.Value('i', 0) # Define shared variables 
p1 = mp.Process(target=job, args=(v,1))
p2 = mp.Process(target=job, args=(v,3)) # Set different number See how to rob memory 
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()

In the code above , We defined a shared variable v, Both processes can operate on it . stay job() We want v every other 0.1 The second output is accumulated once num Result , But in two processes p1 and p2 Different cumulative values are set in .

process 1 And processes 2 Competing with each other to use shared memory v.

Add process lock

In order to solve the problem that different processes compete for shared resources , We can solve this problem by adding process locks .

First, you need to define a process lock

 l = mp.Lock() # Define a process lock 

Then the process lock information is transferred to each process

 p1 = mp.Process(target=job, args=(v,1,l)) # Need to put Lock Pass in 
p2 = mp.Process(target=job, args=(v,3,l))

stay job() Set the use of process lock in , Ensure that a process has exclusive access to the contents of the lock at runtime

def job(v, num, l):
l.acquire() # Lock the 
for _ in range(5):
time.sleep(0.1)
v.value += num # v.value Get shared memory 
print(v.value)
l.release() # Release 

Complete code :

def job(v, num, l):
l.acquire() # Lock the 
for _ in range(5):
time.sleep(0.1)
v.value += num # Get shared memory 
print(v.value)
l.release() # Release 
def multicore():
l = mp.Lock() # Define a process lock 
v = mp.Value('i', 0) # Define shared memory 
p1 = mp.Process(target=job, args=(v,1,l)) # Need to put lock Pass in 
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()

Run it , Let's see if there will still be preemption of resources :

1
2
3
4
5
8
11
14
17
20

obviously , The process lock guarantees that the process p1 Complete operation of , Then the process is carried out p2 Operation of


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved