溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

python如何實(shí)現(xiàn)事件驅(qū)動(dòng)

發(fā)布時(shí)間:2021-04-09 11:47:18 來(lái)源:億速云 閱讀:203 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹python如何實(shí)現(xiàn)事件驅(qū)動(dòng),文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

EventManager事件管理類(lèi)實(shí)現(xiàn),大概就百來(lái)行代碼左右。

# encoding: UTF-8
# 系統(tǒng)模塊
from Queue import Queue, Empty
from threading import *
#################################################
class EventManager:
 #----------------------------------------------------------------------
 def __init__(self):
  """初始化事件管理器"""
  # 事件對(duì)象列表
  self.__eventQueue = Queue()
  # 事件管理器開(kāi)關(guān)
  self.__active = False
  # 事件處理線(xiàn)程
  self.__thread = Thread(target = self.__Run)
 
  # 這里的__handlers是一個(gè)字典,用來(lái)保存對(duì)應(yīng)的事件的響應(yīng)函數(shù)
  # 其中每個(gè)鍵對(duì)應(yīng)的值是一個(gè)列表,列表中保存了對(duì)該事件監(jiān)聽(tīng)的響應(yīng)函數(shù),一對(duì)多
  self.__handlers = {}
 
 #----------------------------------------------------------------------
 def __Run(self):
  """引擎運(yùn)行"""
  while self.__active == True:
   try:
    # 獲取事件的阻塞時(shí)間設(shè)為1秒
    event = self.__eventQueue.get(block = True, timeout = 1) 
    self.__EventProcess(event)
   except Empty:
    pass
 
 #----------------------------------------------------------------------
 def __EventProcess(self, event):
  """處理事件"""
  # 檢查是否存在對(duì)該事件進(jìn)行監(jiān)聽(tīng)的處理函數(shù)
  if event.type_ in self.__handlers:
   # 若存在,則按順序?qū)⑹录鬟f給處理函數(shù)執(zhí)行
   for handler in self.__handlers[event.type_]:
    handler(event)
 
 #----------------------------------------------------------------------
 def Start(self):
  """啟動(dòng)"""
  # 將事件管理器設(shè)為啟動(dòng)
  self.__active = True
  # 啟動(dòng)事件處理線(xiàn)程
  self.__thread.start()
 
 #----------------------------------------------------------------------
 def Stop(self):
  """停止"""
  # 將事件管理器設(shè)為停止
  self.__active = False
  # 等待事件處理線(xiàn)程退出
  self.__thread.join()
 
 #----------------------------------------------------------------------
 def AddEventListener(self, type_, handler):
  """綁定事件和監(jiān)聽(tīng)器處理函數(shù)"""
  # 嘗試獲取該事件類(lèi)型對(duì)應(yīng)的處理函數(shù)列表,若無(wú)則創(chuàng)建
  try:
   handlerList = self.__handlers[type_]
  except KeyError:
   handlerList = []
 
  self.__handlers[type_] = handlerList
  # 若要注冊(cè)的處理器不在該事件的處理器列表中,則注冊(cè)該事件
  if handler not in handlerList:
   handlerList.append(handler)
 
 #----------------------------------------------------------------------
 def RemoveEventListener(self, type_, handler):
  """移除監(jiān)聽(tīng)器的處理函數(shù)"""
  #讀者自己試著實(shí)現(xiàn)
 
 #----------------------------------------------------------------------
 def SendEvent(self, event):
  """發(fā)送事件,向事件隊(duì)列中存入事件"""
  self.__eventQueue.put(event)
 
########################################################################
"""事件對(duì)象"""
class Event:
 def __init__(self, type_=None):
  self.type_ = type_  # 事件類(lèi)型
  self.dict = {}   # 字典用于保存具體的事件數(shù)據(jù)

測(cè)試代碼

#-------------------------------------------------------------------
# encoding: UTF-8
import sys
from datetime import datetime
from threading import *
from EventManager import *
 
#事件名稱(chēng) 新文章
EVENT_ARTICAL = "Event_Artical"
 
#事件源 公眾號(hào)
class PublicAccounts:
 def __init__(self,eventManager):
  self.__eventManager = eventManager
 def WriteNewArtical(self):
  #事件對(duì)象,寫(xiě)了新文章
  event = Event(type_=EVENT_ARTICAL)
  event.dict["artical"] = u'如何寫(xiě)出更優(yōu)雅的代碼\n'
  #發(fā)送事件
  self.__eventManager.SendEvent(event)
  print u'公眾號(hào)發(fā)送新文章\n'
 
#監(jiān)聽(tīng)器 訂閱者
class Listener:
 def __init__(self,username):
  self.__username = username
 
 #監(jiān)聽(tīng)器的處理函數(shù) 讀文章
 def ReadArtical(self,event):
  print(u'%s 收到新文章' % self.__username)
  print(u'正在閱讀新文章內(nèi)容:%s' % event.dict["artical"])
 
"""測(cè)試函數(shù)"""
#--------------------------------------------------------------------
def test():
 listner1 = Listener("thinkroom") #訂閱者1
 listner2 = Listener("steve")#訂閱者2
 
 eventManager = EventManager()
 
 #綁定事件和監(jiān)聽(tīng)器響應(yīng)函數(shù)(新文章)
 eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
 eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
 eventManager.Start()
 
 publicAcc = PublicAccounts(eventManager)
 timer = Timer(2, publicAcc.WriteNewArtical)
 timer.start()
 
if __name__ == '__main__':
 test()

以上是“python如何實(shí)現(xiàn)事件驅(qū)動(dòng)”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI