您好,登錄后才能下訂單哦!
Python 利用selectors模塊實(shí)現(xiàn)非阻塞式編程?很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
前面介紹的 socket 都是采用阻塞方式進(jìn)行通信的,當(dāng)程序調(diào)用 recv() 方法從 socket 中讀取數(shù)據(jù)時(shí),如果沒有讀取到有效的數(shù)據(jù),當(dāng)前線程就會被阻塞。為了解決這個(gè)問題,上面程序采用了多線程并發(fā)編程,即服務(wù)器端為每個(gè)客戶端連接都啟動一個(gè)單獨(dú)的線程,不同的線程負(fù)責(zé)對應(yīng)的 socket 的通信工作。
通過 selectors 模塊允許 socket 以非阻塞方式進(jìn)行通信,selectors 相當(dāng)于一個(gè)事件注冊中心,程序只要將 socket 的所有事件注冊給 selectors 管理,當(dāng) selectors 檢測到 socket 中的特定事件之后,程序就調(diào)用相應(yīng)的監(jiān)聽方法進(jìn)行處理。
selectors 主要支持兩種事件:
selectors.EVENT_READ:當(dāng) socket 有數(shù)據(jù)可讀時(shí)觸發(fā)該事件。當(dāng)有客戶端連接進(jìn)來時(shí)也會觸發(fā)該事件。
selectors.EVENT_WRITE:當(dāng) socket 將要寫數(shù)據(jù)時(shí)觸發(fā)該事件。
使用 selectors 實(shí)現(xiàn)非阻塞式編程的步驟大致如下:
創(chuàng)建 selectors 對象。
通過 selectors 對象為 socket 的 selectors.EVENT_READ 或 selectors.EVENT_WRITE 事件注冊監(jiān)聽器函數(shù)。每當(dāng) socket 有數(shù)據(jù)需要讀寫時(shí),系統(tǒng)負(fù)責(zé)觸發(fā)所注冊的監(jiān)昕器函數(shù)。
在監(jiān)聽器函數(shù)中處理 socket 通信。
下面程序使用 selectors 模塊實(shí)現(xiàn)非阻塞式通信的服務(wù)器端:
import selectors, socket # 創(chuàng)建默認(rèn)的selectors對象 sel = selectors.DefaultSelector() # 負(fù)責(zé)監(jiān)聽“有數(shù)據(jù)可讀”事件的函數(shù) def read(skt, mask): try: # 讀取數(shù)據(jù) data = skt.recv(1024) if data: # 將讀取的數(shù)據(jù)采用循環(huán)向每個(gè)socket發(fā)送一次 for s in socket_list: s.send(data) # Hope it won't block else: # 如果該socket已被對方關(guān)閉,關(guān)閉該socket, # 并從socket_list列表中刪除 print('關(guān)閉', skt) sel.unregister(skt) skt.close() socket_list.remove(skt) # 如果捕捉到異常, 將該socket關(guān)閉,并從socket_list列表中刪除 except: print('關(guān)閉', skt) sel.unregister(skt) skt.close() socket_list.remove(skt) socket_list = [] # 負(fù)責(zé)監(jiān)聽“客戶端連接進(jìn)來”事件的函數(shù) def accept(sock, mask): conn, addr = sock.accept() # 使用socket_list保存代表客戶端的socket socket_list.append(conn) conn.setblocking(False) # 使用sel為conn的EVENT_READ事件注冊read監(jiān)聽函數(shù) sel.register(conn, selectors.EVENT_READ, read) #② sock = socket.socket() sock.bind(('192.168.1.88', 30000)) sock.listen() # 設(shè)置該socket是非阻塞的 sock.setblocking(False) # 使用sel為sock的EVENT_READ事件注冊accept監(jiān)聽函數(shù) sel.register(sock, selectors.EVENT_READ, accept) #① # 采用死循環(huán)不斷提取sel的事件 while True: events = sel.select() for key, mask in events: # key的data屬性獲取為該事件注冊的監(jiān)聽函數(shù) callback = key.data # 調(diào)用監(jiān)聽函數(shù), key的fileobj屬性獲取被監(jiān)聽的socket對象 callback(key.fileobj, mask)
上面程序中定義了兩個(gè)監(jiān)聽器函數(shù) accept() 和 read(),其中 accept() 函數(shù)作為“有客戶端連接進(jìn)來”事件的監(jiān)聽函數(shù),主程序中的 ① 號代碼負(fù)責(zé)為 socket 的 selectors.EVENT_READ 事件注冊該函數(shù);read() 函數(shù)則作為“有數(shù)據(jù)可讀”事件的監(jiān)聽函數(shù),如 accept() 函數(shù)中的 ② 號代碼所示。
通過上面這種方式,程序避免了采用死循環(huán)不斷地調(diào)用 socket 的 accept() 方法來接受客戶端連接,也避免了采用死循環(huán)不斷地調(diào)用 socket 的 recv() 方法來接收數(shù)據(jù)。socket 的 accept()、recv() 方法調(diào)用都是寫在事件監(jiān)聽函數(shù)中的,只有當(dāng)事件(如“有客戶端連接進(jìn)來”事件、“有數(shù)據(jù)可讀”事件)發(fā)生時(shí),accept() 和 recv() 方法才會被調(diào)用,這樣就避免了阻塞式編程。
為了不斷地提取 selectors 中的事件,程序最后使用一個(gè)死循環(huán)不斷地調(diào)用 selectors 的 select() 方法“監(jiān)測”事件,每當(dāng)監(jiān)測到相應(yīng)的事件之后,程序就會調(diào)用對應(yīng)的事件監(jiān)聽函數(shù)。
下面是該示例的客戶端程序。該客戶端程序更加簡單,客戶端程序只需要讀取 socket 中的數(shù)據(jù),因此只要使用 selectors 為 socket 注冊“有數(shù)據(jù)可讀”事件的監(jiān)聽函數(shù)即可。
import selectors, socket, threading # 創(chuàng)建默認(rèn)的selectors對象 sel = selectors.DefaultSelector() # 負(fù)責(zé)監(jiān)聽“有數(shù)據(jù)可讀”事件的函數(shù) def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print(data.decode('utf-8')) else: print('closing', conn) sel.unregister(conn) conn.close() # 創(chuàng)建socket對象 s = socket.socket() # 連接遠(yuǎn)程主機(jī) s.connect(('192.168.1.88', 30000)) # 設(shè)置該socket是非阻塞的 s.setblocking(False) # 使用sel為s的EVENT_READ事件注冊read監(jiān)聽函數(shù) sel.register(s, selectors.EVENT_READ, read) # ① # 定義不斷讀取用戶鍵盤輸入的函數(shù) def keyboard_input(s): while True: line = input('') if line is None or line == 'exit': break # 將用戶的鍵盤輸入內(nèi)容寫入socket s.send(line.encode('utf-8')) # 采用線程不斷讀取用戶的鍵盤輸入 threading.Thread(target=keyboard_input, args=(s, )).start() while True: # 獲取事件 events = sel.select() for key, mask in events: # key的data屬性獲取為該事件注冊的監(jiān)聽函數(shù) callback = key.data # 調(diào)用監(jiān)聽函數(shù), key的fileobj屬性獲取被監(jiān)聽的socket對象 callback(key.fileobj, mask)
上面程序中的 ① 號代碼為 socket 的 EVENT_READ 事件注冊了 read() 監(jiān)聽函數(shù),這樣每當(dāng) socket 中有數(shù)據(jù)可讀時(shí),程序就會觸發(fā) read() 函數(shù)來讀取 socket 中的數(shù)據(jù)。
程序最后也采用死循環(huán)不斷地調(diào)用 selectors 的 select() 方法“監(jiān)測”事件,每當(dāng)監(jiān)測到相應(yīng)的事件之后,程序就會調(diào)用對應(yīng)的事件監(jiān)聽函數(shù)。
先運(yùn)行上面的服務(wù)器端程序,該程序運(yùn)行后只是作為服務(wù)器,看不到任何輸出信息。再運(yùn)行多個(gè)客戶端程序(相當(dāng)于啟動多個(gè)聊天室客戶端登錄該服務(wù)器)。接下來可以在任何一個(gè)客戶端通過鍵盤輸入一些內(nèi)容,然后按回車鍵,即可在所有客戶端(包括自己)的控制臺上接收到剛剛輸入的內(nèi)容。這也是一個(gè)粗略的 C/S 結(jié)構(gòu)的聊天室應(yīng)用。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。