程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Multiprocess in python (communication between processes)

編輯:Python

List of articles

  • Sharing data between processes
    • Shared memory
    • Server process
    • Communication between processes
      • queue
      • The Conduit

This article will explain in Python Use in Multi process Module is the concept of sharing data and message passing between processes .
In multiprocessing , Any newly created process will do the following :

  • Independent operation
  • Have their own memory space .

Consider using the following procedure to understand this concept :

import multiprocessing
# Empty list with global scope
result = []
def square_list(mylist):
""" Function to square a given list """
global result
# take mylist The block of is attached to the global list result
for num in mylist:
result.append(num * num)
# Print global list results
print("Result(in process p1): {}".format(result))
if __name__ == "__main__":
# Input list
mylist = [1, 2, 3, 4]
# Create a new process
p1 = multiprocessing.Process(target=square_list, args=(mylist,))
# Start the process
p1.start()
# Wait for the process to complete
p1.join()
# Print the global results list
print("Result(in main program): {}".format(result))

Here is the output

Result(in process p1): [1, 4, 9, 16]
Result(in main program): []

In the example above , We try to print the global list in two places result The content of :

  • stay square_list Function . Because this function is controlled by the process p1 call , So the results The list is only available in the process p1 Changes in the memory space of .
  • Complete the process in the main program p1 after . Because the main program is run by different processes , So its memory space still contains empty Result list .

The following figure shows this concept :

Sharing data between processes

Shared memory

Shared memory : Multi process Module supply Array and value Object to share data between processes .

  • Array : from Shared memory Allocated in ctypes Array .
  • value : from Shared memory Allocated in ctypes object .

Here is a simple example , Shows the use of Array and Value Sharing data between processes .

import multiprocessing
def square_list(mylist, result, square_sum):
""" Function to square a given list """
# take mylist To the result array
for idx, num in enumerate(mylist):
result[idx] = num * num
# Sum of squares
square_sum.value = sum(result)
# Print result array
print("Result(in process p1): {}".format(result[:]))
# Print square_sum value
print("Sum of squares(in process p1): {}".format(square_sum.value))
if __name__ == "__main__":
# Input list
mylist = [1, 2, 3, 4]
# establish int An array of data types , Among them is 4 An integer space
result = multiprocessing.Array('i', 4)
# establish int Value of data type
square_sum = multiprocessing.Value('i')
# Create a new process
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
# Starting process
p1.start()
# Wait for the process to complete
p1.join()
# Print result array
print("Result(in main program): {}".format(result[:]))
# Print square_sum value
print("Sum of squares(in main program): {}".format(square_sum.value))

Running results :

Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30

Let's try to understand the above code line by line :

  • First , Let's create an array result , As shown below :

     result = multiprocessing.Array('i', 4)
    
    • The first parameter is data type .“i” For integers , and “d” Represents a floating point data type .
    • The second parameter is the array Size . ad locum , Let's create a 4 Array of elements .

    Again , We create a value square_sum As shown below :

     square_sum = multiprocessing.Value('i')
    

    ad locum , We just need to specify the data type . This value can give an initial value ( for example 10), As shown below :

     square_sum = multiprocessing.Value('i', 10)
    
  • secondly , We're creating Process Object will be result and square_sum Pass as a parameter .

     p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
    
  • By specifying the index of the array element , by result The array element specifies a value .

     for idx, num in enumerate(mylist):
    result[idx] = num * num
    

    square_sum By using it value Attribute to assign value

     square_sum.value = sum(result)
    
  • For printing result Array elements , We use result[:] To print the complete array .

     print("Result(in process p1): {}".format(result[:]))
    

    square_sum Values are simply printed as :

     print("Sum of squares(in process p1): {}".format(square_sum.value))
    

The following figure describes how processes share Array and value object :

Server process

Server process : whenever python Program startup , Server process It will also start . From then on , Whenever a new process is needed , The parent process will then connect to the server and ask it to fork over the new process .
Server process It can be saved Python object , And allow other processes to manipulate them using proxies .
Deal with more modular Provides a Manager class , Used to control server processes . therefore , Managers provide a way to create data that can be shared between different processes .

The server process manager uses Shared memory Objects are more flexible , Because they can support any object type , As listing 、 Dictionaries 、 queue 、 value 、 Array etc. . Besides , A single manager can be shared by processes on different computers on the network . however , They are slower than using shared memory .

Consider the example given below :

import multiprocessing
def print_records(records):
""" For printing records ( list ) Records in ( Tuples ) Function of """
for record in records:
print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
def insert_record(record, records):
""" To record ( list ) Add new record function """
records.append(record)
print(" New record added !\n")
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
# Create a list in the server process memory
records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin', 9)])
# New record to insert into the record
new_record = ('Jeff', 8)
# Create a new process
p1 = multiprocessing.Process(target=insert_record, args=(new_record, records))
p2 = multiprocessing.Process(target=print_records, args=(records,))
# Run the process p1 To insert a new record
p1.start()
p1.join()
# Run the process p2 To print records
p2.start()
p2.join()

Run output :

 New record added !
Name: Sam
Score: 10
Name: Adam
Score: 9
Name: Kevin
Score: 9
Name: Jeff
Score: 8
Process ended , The exit code is 0

Let's try to understand the above code snippet :

  • First , We use the following command to create a Manager object :

     with multiprocessing.Manager() as manager:
    

    with Sentence block All lines under are in manager Object .

  • then , We use the following command in Server process Create a list in memory Record

     records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin',9)])
    

    Again , You can create a dictionary as manager.dict Method .

  • Last , We create processes p1( stay Record Insert a new record in the list ) and p2( Print Record ), And pass the record as one of the parameters when Run them .

Server process The concept of is shown in the figure below :

Communication between processes

Effective use of multiple processes often requires some communication between them , So that you can divide the work and aggregate the results .
Deal with more Support two types of communication channels between processes :

  • queue
  • tube

queue

  1. queue : A simple way to communicate between a process and multiprocessing is to use queues to pass messages back and forth . whatever Python Objects can be queued .
    Be careful : Deal with more . queue Class is an approximate clone of a queue . queue .
    Refer to the example program given below :

    import multiprocessing
    def square_list(mylist, q):
    """ Function to square a given list """
    # take mylist The block of is attached to the queue
    for num in mylist:
    q.put(num * num)
    def print_queue(q):
    """ Functions that print queue elements """
    print(" Queue element :")
    while not q.empty():
    print(q.get())
    print(" The queue is now empty !")
    if __name__ == "__main__":
    # Input list
    mylist = [1, 2, 3, 4]
    # Create a multi process queue
    q = multiprocessing.Queue()
    # Create a new process
    p1 = multiprocessing.Process(target=square_list, args=(mylist, q))
    p2 = multiprocessing.Process(target=print_queue, args=(q,))
    # Process p1 Run to list
    p1.start()
    p1.join()
    # Run the process p2 To get the queue elements
    p2.start()
    p2.join()
    

Running results :

Let's try to understand the above code step by step :

  • First , We use the following command to create a Multiprocessing queue

     q = multiprocessing.Queue()
    
  • then , We pass the process p1 Empty the queue q Pass to square_list function . Use put Method to insert an element into the queue .

     q.put(num * num)
    
  • To print queue elements , We use get Method , Until the queue is not empty .

     while not q.empty():
    print(q.get())
    

Here is a simple chart , Describes the operations on the queue :

The Conduit

The Conduit : A pipe can only have two endpoints . therefore , When only two-way communication is required , It takes precedence over queues .

Deal with more Module supply Pipe() function , This function returns a pair of connection objects connected by a pipe .Pipe() The two connection objects returned represent both ends of the pipe . Each connection object has one send() and recv() Method ( And other methods ).
Consider the procedure given below :

import multiprocessing
def sender(conn, msgs):
""" A function used to send a message to the other end of the pipeline """
for msg in msgs:
conn.send(msg)
print(" Message sent : {}".format(msg))
conn.close()
def receiver(conn):
""" A function for printing messages received from the other end of the pipeline """
while 1:
msg = conn.recv()
if msg == "END":
break
print(" Received a message : {}".format(msg))
if __name__ == "__main__":
# Message to send
msgs = ["hello", "hey", "hru?", "END"]
# Create pipes
parent_conn, child_conn = multiprocessing.Pipe()
# Create a new process
p1 = multiprocessing.Process(target=sender, args=(parent_conn, msgs))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
# Running process
p1.start()
p2.start()
# Wait for the process to complete
p1.join()
p2.join()

Running results :

Let's try to understand the code above :

  • Pipes are created using the following methods :

     parent_conn, child_conn = multiprocessing.Pipe()
    

    This function returns two connection objects for both ends of the pipe .

  • message send Method is sent from one end of the pipe to another End .

     conn.send(msg)
    
  • To receive any message at one end of the pipeline , We use recv Method .

     msg = conn.recv()
    
  • In the above procedure , We send the message list from one end to the other . On the other end , We read the news , Until receipt “END” news .

Consider the image below , It shows the relationship between black and white pipes and processes :

Be careful : If two processes ( Or thread ) Try to read or write to the same end of the pipe at the same time , The data in the pipeline may be corrupted . Of course , There is no risk of damage to processes that use different ends of the pipeline at the same time . Attention, please. , Queues perform appropriate synchronization between processes , But the price is added complexity . therefore , Queues are considered thread and process safe !


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved