溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python 利用selectors模塊實(shí)現(xiàn)非阻塞式編程

發(fā)布時(shí)間:2020-09-24 13:28:11 來源:億速云 閱讀:177 作者:Leah 欄目:編程語言

Python 利用selectors模塊實(shí)現(xiàn)非阻塞式編程?很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

前面介紹的 socket 都是采用阻塞方式進(jìn)行通信的,當(dāng)程序調(diào)用 recv() 方法從 socket 中讀取數(shù)據(jù)時(shí),如果沒有讀取到有效的數(shù)據(jù),當(dāng)前線程就會被阻塞。為了解決這個(gè)問題,上面程序采用了多線程并發(fā)編程,即服務(wù)器端為每個(gè)客戶端連接都啟動一個(gè)單獨(dú)的線程,不同的線程負(fù)責(zé)對應(yīng)的 socket 的通信工作。

通過 selectors 模塊允許 socket 以非阻塞方式進(jìn)行通信,selectors 相當(dāng)于一個(gè)事件注冊中心,程序只要將 socket 的所有事件注冊給 selectors 管理,當(dāng) selectors 檢測到 socket 中的特定事件之后,程序就調(diào)用相應(yīng)的監(jiān)聽方法進(jìn)行處理。

selectors 主要支持兩種事件:

selectors.EVENT_READ:當(dāng) socket 有數(shù)據(jù)可讀時(shí)觸發(fā)該事件。當(dāng)有客戶端連接進(jìn)來時(shí)也會觸發(fā)該事件。

selectors.EVENT_WRITE:當(dāng) socket 將要寫數(shù)據(jù)時(shí)觸發(fā)該事件。

使用 selectors 實(shí)現(xiàn)非阻塞式編程的步驟大致如下:

創(chuàng)建 selectors 對象。

通過 selectors 對象為 socket 的 selectors.EVENT_READ 或 selectors.EVENT_WRITE 事件注冊監(jiān)聽器函數(shù)。每當(dāng) socket 有數(shù)據(jù)需要讀寫時(shí),系統(tǒng)負(fù)責(zé)觸發(fā)所注冊的監(jiān)昕器函數(shù)。

在監(jiān)聽器函數(shù)中處理 socket 通信。

下面程序使用 selectors 模塊實(shí)現(xiàn)非阻塞式通信的服務(wù)器端:

import selectors, socket

# 創(chuàng)建默認(rèn)的selectors對象
sel = selectors.DefaultSelector()
# 負(fù)責(zé)監(jiān)聽“有數(shù)據(jù)可讀”事件的函數(shù)
def read(skt, mask):
    try:
        # 讀取數(shù)據(jù)
        data = skt.recv(1024)
        if data:
            # 將讀取的數(shù)據(jù)采用循環(huán)向每個(gè)socket發(fā)送一次
            for s in socket_list:
                s.send(data)  # Hope it won't block
        else:
            # 如果該socket已被對方關(guān)閉,關(guān)閉該socket,
            # 并從socket_list列表中刪除
            print('關(guān)閉', skt)
            sel.unregister(skt)
            skt.close()
            socket_list.remove(skt)
    # 如果捕捉到異常, 將該socket關(guān)閉,并從socket_list列表中刪除
    except:
        print('關(guān)閉', skt)
        sel.unregister(skt)
        skt.close()
        socket_list.remove(skt)
socket_list = []
# 負(fù)責(zé)監(jiān)聽“客戶端連接進(jìn)來”事件的函數(shù)
def accept(sock, mask):
    conn, addr = sock.accept()
    # 使用socket_list保存代表客戶端的socket
    socket_list.append(conn)
    conn.setblocking(False)
    # 使用sel為conn的EVENT_READ事件注冊read監(jiān)聽函數(shù)
    sel.register(conn, selectors.EVENT_READ, read)    #②
sock = socket.socket()
sock.bind(('192.168.1.88', 30000))
sock.listen()
# 設(shè)置該socket是非阻塞的
sock.setblocking(False)
# 使用sel為sock的EVENT_READ事件注冊accept監(jiān)聽函數(shù)
sel.register(sock, selectors.EVENT_READ, accept)    #①
# 采用死循環(huán)不斷提取sel的事件
while True:
    events = sel.select()
    for key, mask in events:
        # key的data屬性獲取為該事件注冊的監(jiān)聽函數(shù)
        callback = key.data
        # 調(diào)用監(jiān)聽函數(shù), key的fileobj屬性獲取被監(jiān)聽的socket對象
        callback(key.fileobj, mask)

上面程序中定義了兩個(gè)監(jiān)聽器函數(shù) accept() 和 read(),其中 accept() 函數(shù)作為“有客戶端連接進(jìn)來”事件的監(jiān)聽函數(shù),主程序中的 ① 號代碼負(fù)責(zé)為 socket 的 selectors.EVENT_READ 事件注冊該函數(shù);read() 函數(shù)則作為“有數(shù)據(jù)可讀”事件的監(jiān)聽函數(shù),如 accept() 函數(shù)中的 ② 號代碼所示。

通過上面這種方式,程序避免了采用死循環(huán)不斷地調(diào)用 socket 的 accept() 方法來接受客戶端連接,也避免了采用死循環(huán)不斷地調(diào)用 socket 的 recv() 方法來接收數(shù)據(jù)。socket 的 accept()、recv() 方法調(diào)用都是寫在事件監(jiān)聽函數(shù)中的,只有當(dāng)事件(如“有客戶端連接進(jìn)來”事件、“有數(shù)據(jù)可讀”事件)發(fā)生時(shí),accept() 和 recv() 方法才會被調(diào)用,這樣就避免了阻塞式編程。

為了不斷地提取 selectors 中的事件,程序最后使用一個(gè)死循環(huán)不斷地調(diào)用 selectors 的 select() 方法“監(jiān)測”事件,每當(dāng)監(jiān)測到相應(yīng)的事件之后,程序就會調(diào)用對應(yīng)的事件監(jiān)聽函數(shù)。

下面是該示例的客戶端程序。該客戶端程序更加簡單,客戶端程序只需要讀取 socket 中的數(shù)據(jù),因此只要使用 selectors 為 socket 注冊“有數(shù)據(jù)可讀”事件的監(jiān)聽函數(shù)即可。

import selectors, socket, threading

# 創(chuàng)建默認(rèn)的selectors對象
sel = selectors.DefaultSelector()
# 負(fù)責(zé)監(jiān)聽“有數(shù)據(jù)可讀”事件的函數(shù)
def read(conn, mask):
    data = conn.recv(1024)  # Should be ready
    if data:
        print(data.decode('utf-8'))
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
# 創(chuàng)建socket對象
s = socket.socket()
# 連接遠(yuǎn)程主機(jī)
s.connect(('192.168.1.88', 30000))
# 設(shè)置該socket是非阻塞的
s.setblocking(False)
# 使用sel為s的EVENT_READ事件注冊read監(jiān)聽函數(shù)
sel.register(s, selectors.EVENT_READ, read)    # ①
# 定義不斷讀取用戶鍵盤輸入的函數(shù)
def keyboard_input(s):
    while True:
        line = input('')
        if line is None or line == 'exit':
            break
        # 將用戶的鍵盤輸入內(nèi)容寫入socket
        s.send(line.encode('utf-8'))
# 采用線程不斷讀取用戶的鍵盤輸入
threading.Thread(target=keyboard_input, args=(s, )).start()
while True:
    # 獲取事件
    events = sel.select()
    for key, mask in events:
        # key的data屬性獲取為該事件注冊的監(jiān)聽函數(shù)
        callback = key.data
        # 調(diào)用監(jiān)聽函數(shù), key的fileobj屬性獲取被監(jiān)聽的socket對象
        callback(key.fileobj, mask)

上面程序中的 ① 號代碼為 socket 的 EVENT_READ 事件注冊了 read() 監(jiān)聽函數(shù),這樣每當(dāng) socket 中有數(shù)據(jù)可讀時(shí),程序就會觸發(fā) read() 函數(shù)來讀取 socket 中的數(shù)據(jù)。

程序最后也采用死循環(huán)不斷地調(diào)用 selectors 的 select() 方法“監(jiān)測”事件,每當(dāng)監(jiān)測到相應(yīng)的事件之后,程序就會調(diào)用對應(yīng)的事件監(jiān)聽函數(shù)。

先運(yùn)行上面的服務(wù)器端程序,該程序運(yùn)行后只是作為服務(wù)器,看不到任何輸出信息。再運(yùn)行多個(gè)客戶端程序(相當(dāng)于啟動多個(gè)聊天室客戶端登錄該服務(wù)器)。接下來可以在任何一個(gè)客戶端通過鍵盤輸入一些內(nèi)容,然后按回車鍵,即可在所有客戶端(包括自己)的控制臺上接收到剛剛輸入的內(nèi)容。這也是一個(gè)粗略的 C/S 結(jié)構(gòu)的聊天室應(yīng)用。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI