程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python distributed process

編輯:Python

Distributed processes :

A distributed process is a process that will Process Processes are distributed across multiple machines , Make full use of the performance of multiple machines to complete complex tasks . stay Thread and Process in , It should be preferred that Process, because Process A more stable , and ,Process It can be distributed to multiple machines , and Thread At most, it can only be distributed to multiple of the same machine CPU On .

Python Of multiprocessing Module not only supports multi process , among managers Sub modules also support the distribution of multiple processes to multiple machines . A service process can act as a dispatcher , Distribute tasks to multiple other processes , Rely on network communication . because managers The module package is very good , You don't need to know the details of network communication , It's easy to write distributed multiprocessing programs .

for instance : When doing a crawler , We often encounter such scenes , We want to capture the link address of the picture , Store the link address in Queue in , Another process is responsible for starting from Queue Read the link address to download and store it locally . Now make this process distributed , A process on a machine is responsible for grabbing Links , Processes on other machines are responsible for downloading storage , So the main problem is that Queue Exposed to the Internet , Make other machine processes accessible , Distributed process encapsulates this process , We can call this process the networking of this queue .

Creating a distributed process requires a service process and a task process :

Service process creation :

Build queue Queue, Used for inter process communication . The service process creates a task queue task_queue, Used as a The channel that passes the task to the task process ; The service process creates a result queue result_queue, As the channel for the task process to reply to the service process after completing the task . In a distributed multiprocessing environment , Must pass by Queuemanager get Queue Interface to add tasks .

Register the queue established in the first step on the network , Expose to other processes ( host ), Get the network queue after registration , It is equivalent to the image of the queue of this team .

Create a danger (Queuemanager(BaseManager)) example manager, Bind port and verify password .

Start the instance created in step 3 , That is, start Management manager, Regulatory information channels

Through the method of managing instances, access through the network can be obtained Queue object , That is, the network queue is materialized into a local queue that can be used .

Create task to " Local " In line , Automatically upload tasks to the network queue , Assigned to the task process for processing .

Be careful : I'm based here window Operating system ,linux The system will be different

# coding:utf-8
# taskManager.py for win
import Queue
from multiprocessing.managers import BaseManage
from multiprocessing import freeze_support

The number of tasks

task_num = 10

Define the send receive queue

task_queue = Queue.Queue(task_num)
result_queue = Queue.Queue(task_num)
def get_task():
return task_queue
def get_result():
return result_queue

Create a similar QueueManage

class QueueManager(BaseManager):
pass
def win_run():
# windows The lower bound calling interface cannot be used lambda, So you can only define the function first and then bind it
QueueManager.register('get_task_queue', callable=get_task)
QueueManager.register('get_result_queue', callable=get_result)
# Bind the port and set the authentication password ,windows Need to fill in IP Address ,Linux No filling below , Default to local
manager = QueueManager(address=('127.0.0.1', 4000), authkey='qty')

start-up

manager.start()

Get task queue and result queue through network

 task = manager.get_task_queue()
result = manager.get_result_queue()
try:
# Add tasks
for i in range(10):
print 'put task %s...' % i
task.put(i)
print 'try get result...'
for i in range(10):
print 'result is %s' % result.get(timeout=10)
 except:
print 'manage error'
finally:

Be sure to close , Otherwise, an error that the management is not closed will be reported

 manager.shutdown()
print 'master exit!'
if __name__ == '__main__':

windows There may be problems with multiple processes , Add this sentence to ease

 freeze_support()
win_run()

Task progress

Use QueueManager Register to get Queue Method name of , Task processes can only be obtained on the network by name Queue

Connect to the server , The port and authentication password should be completely consistent with the service process

Get it from the Internet Queue, Localize

from Task Queue get task , And put the results result queue

coding:utf-8

import time
from multiprocessing.managers import BaseManage

Create a similar QueueManager:

class QueueManager(BaseManager):
pass

First step : Use QueueManager Register to get Queue Method name of

QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

The second step : Connect to server

server_addr = '127.0.0.1'
print "Connect to server %s" % server_add

Note that the port and password of the authentication process are completely consistent

m = QueueManager(address=(server_addr, 4000), authkey='qty')

Connect from the network

m.connect()

The third step : obtain Queue The object of

task = m.get_task_queue()
result = m.get_result_queue()

Step four : from task Queue get task , And write the results in result queue :

while not task.empty():
index = task.get(True, timeout=10)
print 'run task download %s' % str(index)
result.put('%s---->success ' % str(index))

End of processing

print 'worker exit.'

Execution results

First run : The service process gets results

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...

Then run immediately : The task process gets results , Prevent the process from not getting results after it is completed , This must be done immediately

Connect to server 127.0.0.1
run task download 0
run task download 1
run task download 2
run task download 3
run task download 4
run task download 5
run task download 6
run task download 7
run task download 8
run task download 9
worker exit.

Finally, look back at the results of the service process window

put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
try get result...
result is 0---->success
result is 1---->success
result is 2---->success
result is 3---->success
result is 4---->success
result is 5---->success
result is 6---->success
result is 7---->success
result is 8---->success
result is 9---->success
master exit!

This is a simple but truly distributed computing , Change the code a little bit , Start multiple worker, Just distribute the task to several or even dozens of machines , Implement large-scale distributed crawlers


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved