程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python 多進程理解

編輯:Python

提示:文章寫完後,目錄可以自動生成,如何生成可參考右邊的幫助文檔

文章目錄

  • 前言
  • 一、Process是什麼?
    • 1.1 join:父進程等待子進程結束後才開始執行自己的代碼
    • 2.2 用類的方式開啟進程
    • 3.3 守護進程
  • 二 鎖 Lock
    • 1.1 異步的問題
    • 2 用鎖解決
  • 三 信號量 Semaphore
  • 四 事件 Event
  • 總結


前言

提示:這裡可以添加本文要記錄的大概內容:


提示:以下是本篇文章正文內容,下面案例可供參考

一、Process是什麼?

介紹:

參數介紹:
1 group參數未使用,值始終為None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,‘a’,)
4 kwargs表示調用對象的字典,kwargs={‘name’:‘a’,‘age’:18}
5 name為子進程的名稱

方法介紹:
1 p.start():啟動進程,並調用該子進程中的p.run()
2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法
3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麼也將不會被釋放,進而導致死鎖
4 p.is_alive():如果p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

屬性介紹:
1 p.daemon:默認值為False,如果設為True,代表p為後台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

1.1 join:父進程等待子進程結束後才開始執行自己的代碼

# 發送一封郵件
import time
import random
from multiprocessing import Process
def func():
time.sleep(random.randint(1,3)) # 模擬郵件發送的延遲
print('郵件已發送')
if __name__ == '__main__':
p = Process(target=func)
p.start()
p.join() # 阻塞 直到進程p執行完畢後才結束阻塞
print('郵件發送完畢')
# 發送十封郵件
import time
import random
from multiprocessing import Process
def func(index):
time.sleep(random.randint(1,3))
print('第%s封郵件發送完畢' %index)
if __name__ == '__main__':
p_lst = []
for i in range(10):
p = Process(target=func,args=(i,))
p.start() # 先讓所有子進程都啟動
p_lst.append(p)
for p in p_lst: # 再進行join阻塞
p.join()
print('10封郵件全部發送完畢')

2.2 用類的方式開啟進程

介紹:
我們之前創建進程的時候,其實也是在創建一個Process類的對象,再調用對象的start方法開啟進程,
那麼我們也可以自己定義一個類來實現進程的創建

import os
from multiprocessing import Process
class MyProcess(Process): # 定義一個類,繼承Process類
def run(self): # 必須實現的方法,是啟動進程的方法
print('子進程:',os.getpid(),os.getppid())
if __name__ == '__main__':
p = MyProcess() # 實例化
p.start() # 自動調用run方法
print('父進程:',os.getpid())
import time
import os
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,i):
super().__init__() # 實現父類的初始化方法
self.index = i # 定義自己的屬性(參數)
def run(self):
time.sleep(1)
print('子進程:',self.index,os.getpid(),os.getppid())
if __name__ == '__main__':
p_lst = []
for i in range(10):
p = MyProcess(i)
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
print('主進程:',os.getpid())
import os
from multiprocessing import Process
def func():
print("i am target func")
class MyProcess(Process): # 定義一個類,繼承Process類
def __init__(self, target):
super(MyProcess, self).__init__() # 實現父類的初始化方法
self.target = target
def run(self): # 必須實現的方法,是啟動進程的方法
self.target()
print('子進程:', os.getpid(), os.getppid())
if __name__ == '__main__':
p = MyProcess(target=func) # 實例化
p.start() # 自動調用run方法
print('父進程:', os.getpid())
# 結果
父進程: 3532
i am target func
子進程: 9628 3532

3.3 守護進程

介紹:
主進程創建守護進程
1:守護進程會在主進程代碼執行結束後就終止
2:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

# 1,守護進程會在主進程代碼執行結束後就終止
import time
from multiprocessing import Process
def func():
print('子進程 start')
time.sleep(3) # 睡3秒的時候主進程的代碼已經執行完畢了,所以子進程也會跟著結束
print('子進程end')
if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # daemon是Process的屬性
p.start()
time.sleep(2) # 睡2秒的時候,執行了子進程
print('主進程')
結果:
子進程 start
主進程
# 守護進程會在主進程代碼執行結束後就終止 ,但是父進程會等待子進程結束才正式結束。注意:代碼結束是指代碼運行到了最後一行,並不代表進程已經結束了
import time
from multiprocessing import Process
def func():
count = 1
while True:
print('*' * count)
time.sleep(1)
count += 1
def func2():
print('普通進程開始')
time.sleep(5)
print('普通進程結束')
if __name__ == '__main__':
p1 = Process(target=func)
p1.daemon = True
p1.start()
Process(target=func2).start()
time.sleep(3)
print('主進程')
結果:
*
普通進程開始
**
***
主進程
普通進程結束
""" 3,守護進程的作用 守護進程可以報活,就是向某個服務報告自己還活著 場景: 例如你寫好了一個網頁,你的服務端是不應該停的,因為你服務端一旦停止了,別人就無法訪問你的網頁了,所以我們應該確保服務端沒有‘死’, 這個時候就可以使用守護進程,在你的服務端起一個守護進程,讓這個進程只做一件事,就是每隔1個小時(時間按照自己的合理安排設定)向某一台機器匯報自己還活著, 一旦這個機器沒有收到你守護進程傳來的消息,那麼就可以認為你的服務端已經掛了 """
import time
from multiprocessing import Process
def Guard():
while True:
time.sleep(3600)
print('我還活著') # 向某個機器匯報我還活著,具體該怎麼寫匯報的邏輯就怎麼寫,這裡只是示范
if __name__ == '__main__':
p = Process(target=Guard)
p.daemon = True
p.start()
# 主進程的邏輯(主進程應該是一直運行的,不應該有代碼結束的時候)
print('主進程')
4、terminate:關閉進程
import time
from multiprocessing import Process
def fun():
print('子進程')
if __name__ == '__main__':
p = Process(target=fun)
p.start()
p.terminate() # 關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p.is_alive()) # True
time.sleep(0.1)
print(p.is_alive()) # False

二 鎖 Lock

1.1 異步的問題

我們都知道異步進程的好處就是可以一起執行,效率高,但是當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。


搶票系統:
#文件ticket的內容為:{"count":3}
#注意一定要用雙引號,不然json無法識別
#並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
import time
import json
from multiprocessing import Process
def search(person):
with open('ticket') as f:
ticketinfo = json.load(f)
print('%s查詢余票:' %person,ticketinfo['count'])
def get_ticket(person):
with open('ticket') as f:
ticketinfo = json.load(f)
time.sleep(0.2) #模擬讀數據的網絡延遲
if ticketinfo['count'] > 0:
print('%s買到票了'%person)
ticketinfo['count'] -= 1
time.sleep(0.2)
with open('ticket','w') as f:
json.dump(ticketinfo,f)
else:
print('%s沒買到票'%person)
def ticket(person):
search(person)
get_ticket(person)
if __name__ == '__main__':
for i in range(5):
p = Process(target=ticket,args=('person%s'%i,))
p.start()
結果:
person0查詢余票: 3
person4查詢余票: 3
person1查詢余票: 3
person2查詢余票: 3
person3查詢余票: 3
person0買到票了
person4買到票了
person1買到票了
person2買到票了
person3買到票了
分析:票只有三張,但是5個人都顯示買到了,這是因為5個進程異步進行,大家都同一時間在對一個文件進行修改,導致的混亂。

2 用鎖解決

# 加鎖降低了程序的效率,讓原來能夠同時執行的代碼變成順序執行了,異步變同步的過程
# 保證了數據的安全
import time
import json
from multiprocessing import Process
from multiprocessing import Lock # 導入Lock類
def search(person):
with open('ticket') as f:
ticketinfo = json.load(f)
print('%s查詢余票:' %person,ticketinfo['count'])
def get_ticket(person):
with open('ticket') as f:
ticketinfo = json.load(f)
time.sleep(0.2) #模擬讀數據的網絡延遲
if ticketinfo['count'] > 0:
print('%s買到票了'%person)
ticketinfo['count'] -= 1
time.sleep(0.2)
with open('ticket','w') as f:
json.dump(ticketinfo,f)
else:
print('%s沒買到票'%person)
def ticket(person,lock):
search(person)
lock.acquire() # 誰獲得鑰匙 誰才能進入
get_ticket(person)
lock.release() # 用完了,把鑰匙給下一個人
if __name__ == '__main__':
lock = Lock() # 創建一個鎖對象
for i in range(5):
p = Process(target=ticket,args=('person%s'%i,lock))
p.start()
結果:
person1查詢余票: 3
person3查詢余票: 3
person0查詢余票: 3
person2查詢余票: 3
person4查詢余票: 3
person1買到票了
person3買到票了
person0買到票了
person2沒買到票
person4沒買到票

三 信號量 Semaphore

1、信號量的實現機制:計數器 + 鎖實現的
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據(Samphore相當於有幾把鑰匙,lock只能有一把鑰匙)

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
def changba(person,sem): # 在唱吧 唱歌
sem.acquire() # 第一次可以同時進來兩個人
print('%s走進唱吧' %person)
time.sleep(random.randint(3,6)) # 每個人唱歌的時間
print('%s走出唱吧' % person) # 唱完走人
sem.release() # 把鑰匙給下一個人
if __name__ == '__main__':
sem = Semaphore(2) # 2把鑰匙
for i in range(5):
p = Process(target=changba,args=('person%s' %i,sem))
p.start()

四 事件 Event

python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

阻塞事件 :wait()方法
wait是否阻塞是看event對象內部的Flag

控制Flag的值:
set() 將Flag的值改成True
clear() 將Flag的值改成False
is_set() 判斷當前的Flag的值

import time
import random
from multiprocessing import Process
from multiprocessing import Event
def traffic_ligth(e): # 紅綠燈
print('\033[31m紅燈亮\033[0m') # Flag 默認是False
while True:
if e.is_set(): # 如果是綠燈
time.sleep(2) # 2秒後
print('\033[31m紅燈亮\033[0m') # 轉為紅燈
e.clear() # 設置為False
else: # 如果是紅燈
time.sleep(2) # 2秒後
print('\033[32m綠燈亮\033[0m') # 轉為綠燈
e.set() # 設置為True
def car(e,i): # 車
if not e.is_set():
print('car %s在等待' %i)
e.wait()
print('car %s 通過了'%i)
if __name__ == '__main__':
e = Event()
p = Process(target=traffic_ligth,args=(e,)) # 紅綠燈進程
p.daemon = True
p.start()
p_lst = []
for i in range(10): # 10輛車的進程
time.sleep(random.randrange(0,3,2))
p = Process(target=car,args=(e,i))
p.start()
p_lst.append(p)
for p in p_lst:p.join()

總結

進程之間雖然內存不共享,但是是可以通信的
Lock Semaphore Event 都在進行進城之間的通信
只不過這些通信的內容我們不能改變
後續還有隊列和管道能讓進程之間進行通信

以上內容拷貝至: https://www.shuzhiduo.com/A/l1dyVEjGze/


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved