程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python simple message bus implementation, similar to C++ Qts signal slot

編輯:Python

一、 概述

For decoupling between modules,A message bus is the usual way.
mentioned in other articleslua和C++An implementation of the language's message bus

  • luaAn implementation of the language's message bus:lua
  • cppAn implementation of the language's message bus:C++A blocking message busmessage_bus實現

The basic principle of the message bus implementation is as follows:The communicated object publishes a topic to the message bus,This topic contains message topics and message handler functions,The message subject identifies a specific subject,A message type that the message handler uses to respond to this topic.The communication object sends a specific topic and message parameters to the message bus,The bus will find the corresponding message processing function to process the request according to the message topic and message parameters.

二、Python消息總線的實現

class PyBus (object):
def __init__(self,):
self.clear()
def clear(self):
self.subscriptions = {
}
def subscribe(self, subject, owner, func):
if owner not in self.subscriptions:
self.subscriptions[owner] = {
}
self.subscriptions[owner][subject] = func
def has_subscription(self, owner, subject):
return owner in self.subscriptions and subject in self.subscriptions[owner]
def publish(self, subject, *args, **kwargs):
for owner in self.subscriptions.keys():
if self.has_subscription(owner, subject):
self.subscriptions[owner][subject](*args, **kwargs)
class BusSingleton(PyBus):
def foo(self):
pass
bus_singleton = PyBus()

The core is maintained internallysubscriptions字典

三、測試代碼

if __name__ == "__main__":
START = 1111
class Test1(object):
def start(self):
print("start1")
class Test2(object):
def start(self):
print("start2")
test1 = Test1()
test2 = Test2()
bus_singleton.subscribe(START, test1, test1.start)
bus_singleton.subscribe(START, test2, test2.start)
bus_singleton.publish(START)

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved