程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python簡單消息總線實現,類似於C++ Qt的信號槽

編輯:Python

一、 概述

為了模塊間解耦,消息總線是常用的方式。
在其它文章中分別提到了lua和C++語言的消息總線的實現

  • lua語言的消息總線的實現:lua
  • cpp語言的消息總線的實現:C++的阻塞式消息總線message_bus實現

消息總線實現的基本原理如下:被通信對象向消息總線發布一個主題,這個主題包含消息主題和消息處理函數,消息主題標示某個特定的主題,消息處理函數用來響應該主題的某種消息類型。通信對象向消息總線發送某個特定主題和消息參數,總線就會根據消息主題和消息參數找到對應的消息處理函數處理該請求。

二、Python消息總線的實現

class PyBus (object):
def __init__(self,):
self.clear()
def clear(self):
self.subscriptions = {
}
def subscribe(self, subject, owner, func):
if owner not in self.subscriptions:
self.subscriptions[owner] = {
}
self.subscriptions[owner][subject] = func
def has_subscription(self, owner, subject):
return owner in self.subscriptions and subject in self.subscriptions[owner]
def publish(self, subject, *args, **kwargs):
for owner in self.subscriptions.keys():
if self.has_subscription(owner, subject):
self.subscriptions[owner][subject](*args, **kwargs)
class BusSingleton(PyBus):
def foo(self):
pass
bus_singleton = PyBus()

核心是內部維護的subscriptions字典

三、測試代碼

if __name__ == "__main__":
START = 1111
class Test1(object):
def start(self):
print("start1")
class Test2(object):
def start(self):
print("start2")
test1 = Test1()
test2 = Test2()
bus_singleton.subscribe(START, test1, test1.start)
bus_singleton.subscribe(START, test2, test2.start)
bus_singleton.publish(START)

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved