您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“python并發(fā)網(wǎng)絡(luò)通信模型有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“python并發(fā)網(wǎng)絡(luò)通信模型有哪些”吧!
循環(huán)接收客戶端請求,處理請求。同一時刻只能處理一個請求,處理完畢后再處理下一個。
優(yōu)點:實現(xiàn)簡單,占用資源少
缺點:無法同時處理多個客戶端請求
適用情況:處理的任務(wù)可以很快完成,客戶端無需長期占用服務(wù)端程序。udp比tcp更適合循環(huán)。
利用IO多路復(fù)用,異步IO等技術(shù),同時處理多個客戶端IO請求。
優(yōu)點 : 資源消耗少,能同時高效處理多個IO行為
缺點 : 只能處理并發(fā)產(chǎn)生的IO事件,無法處理cpu計算
適用情況:HTTP請求,網(wǎng)絡(luò)傳輸?shù)榷际荌O行為。
每當(dāng)一個客戶端連接服務(wù)器,就創(chuàng)建一個新的進(jìn)程/線程為該客戶端服務(wù),客戶端退出時再銷毀該進(jìn)程/線程。
優(yōu)點:能同時滿足多個客戶端長期占有服務(wù)端需求,可以處理各種請求。
缺點: 資源消耗較大
適用情況:客戶端同時連接量較少,需要處理行為較復(fù)雜情況。
1.創(chuàng)建監(jiān)聽套接字
2.等待接收客戶端請求
3.客戶端連接創(chuàng)建新的進(jìn)程處理客戶端請求
4.原進(jìn)程繼續(xù)等待其他客戶端連接
5.如果客戶端退出,則銷毀對應(yīng)的進(jìn)程
from socket import * import os import signal # 創(chuàng)建監(jiān)聽套接字 HOST = '0.0.0.0' PORT = 8888 ADDR = (HOST,PORT) # 客戶端服務(wù)函數(shù) def handle(c): while True: data = c.recv(1024) if not data: break print(data.decode()) c.send(b'OK') c.close() s = socket() # tcp套接字 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 設(shè)置套接字端口重用 s.bind(ADDR) s.listen(3) signal.signal(signal.SIGCHLD,signal.SIG_IGN) # 處理僵尸進(jìn)程 print("Listen the port %d..." % PORT) # 循環(huán)等待客戶端連接 while True: try: c,addr = s.accept() except KeyboardInterrupt: os._exit(0) except Exception as e: print(e) continue # 創(chuàng)建子進(jìn)程處理這個客戶端 pid = os.fork() if pid == 0: # 處理客戶端請求 s.close() handle(c) os._exit(0) # handle處理完客戶端請求子進(jìn)程也退出 # 無論出錯或者父進(jìn)程都要循環(huán)回去接受請求 # c對于父進(jìn)程沒用 c.close()
1.創(chuàng)建監(jiān)聽套接字
2.循環(huán)接收客戶端連接請求
3.當(dāng)有新的客戶端連接創(chuàng)建線程處理客戶端請求
4.主線程繼續(xù)等待其他客戶端連接
5.當(dāng)客戶端退出,則對應(yīng)分支線程退出
from socket import * from threading import Thread import sys # 創(chuàng)建監(jiān)聽套接字 HOST = '0.0.0.0' PORT = 8888 ADDR = (HOST,PORT) # 處理客戶端請求 def handle(c): while True: data = c.recv(1024) if not data: break print(data.decode()) c.send(b'OK') c.close() s = socket() # tcp套接字 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(ADDR) s.listen(3) print("Listen the port %d..."%PORT) # 循環(huán)等待客戶端連接 while True: try: c,addr = s.accept() except KeyboardInterrupt: sys.exit("服務(wù)器退出") except Exception as e: print(e) continue # 創(chuàng)建線程處理客戶端請求 t = Thread(target=handle, args=(c,)) t.setDaemon(True) # 父進(jìn)程結(jié)束則所有進(jìn)程終止 t.start()
客戶端有簡單的頁面命令提示:功能包含:
查看服務(wù)器文件庫中的文件列表(普通文件)
可以下載其中的某個文件到本地
可以上傳客戶端文件到服務(wù)器文件庫
服務(wù)器需求 :
允許多個客戶端同時操作
每個客戶端可能回連續(xù)發(fā)送命令
技術(shù)分析:
tcp套接字更適合文件傳輸
并發(fā)方案 ---》 fork 多進(jìn)程并發(fā)
對文件的讀寫操作獲取
文件列表 ----》 os.listdir()
粘包的處理
服務(wù)器功能封裝在類中(上傳,下載,查看列表)
創(chuàng)建套接字,流程函數(shù)調(diào)用 main()
客戶端負(fù)責(zé)發(fā)起請求,接受回復(fù),展示
服務(wù)端負(fù)責(zé)接受請求,邏輯處理
ftp server:
from socket import * from threading import Thread import os import time # 全局變量 HOST = '0.0.0.0' PORT = 8080 ADDR = (HOST,PORT) FTP = "/home/tarena/FTP/" # 文件庫位置 # 創(chuàng)建文件服務(wù)器服務(wù)端功能類 class FTPServer(Thread): def __init__(self,connfd): self.connfd = connfd super().__init__() def do_list(self): # 獲取文件列表 files = os.listdir(FTP) if not files: self.connfd.send("文件庫為空".encode()) return else: self.connfd.send(b'OK') time.sleep(0.1) # 防止和后面發(fā)送內(nèi)容粘包 # 拼接文件列表 files_ = "" for file in files: if file[0] != '.' and \ os.path.isfile(FTP+file): files_ += file + '\n' self.connfd.send(files_.encode()) def do_get(self,filename): try: fd = open(FTP+filename,'rb') except Exception: self.connfd.send("文件不存在".encode()) return else: self.connfd.send(b'OK') time.sleep(0.1) # 文件發(fā)送 while True: data = fd.read(1024) if not data: time.sleep(0.1) self.connfd.send(b'##') break self.connfd.send(data) # 循環(huán)接收客戶端請求 def run(self): while True: data = self.connfd.recv(1024).decode() if not data or data == 'Q': return elif data == 'L': self.do_list() elif data[0] == 'G': # G filename filename = data.split(' ')[-1] self.do_get(filename) # 網(wǎng)絡(luò)搭建 def main(): # 創(chuàng)建套接字 sockfd = socket() sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) sockfd.bind(ADDR) sockfd.listen(3) print("Listen the port %d..."%PORT) while True: try: connfd,addr = sockfd.accept() print("Connect from",addr) except KeyboardInterrupt: print("服務(wù)器程序退出") return except Exception as e: print(e) continue # 創(chuàng)建新的線程處理客戶端 client = FTPServer(connfd) client.setDaemon(True) client.start() # 運(yùn)行run方法 if __name__ == "__main__": main()
ftp client:
from socket import * import sys ADDR = ('127.0.0.1',8080) # 服務(wù)器地址 # 客戶端功能處理類 class FTPClient: def __init__(self,sockfd): self.sockfd = sockfd def do_list(self): self.sockfd.send(b'L') # 發(fā)送請求 # 等待回復(fù) data = self.sockfd.recv(128).decode() if data == 'OK': # 一次接收文件列表字符串 data = self.sockfd.recv(4096) print(data.decode()) else: print(data) def do_get(self,filename): # 發(fā)送請求 self.sockfd.send(('G '+filename).encode()) # 等待回復(fù) data = self.sockfd.recv(128).decode() if data == 'OK': fd = open(filename,'wb') # 接收文件 while True: data = self.sockfd.recv(1024) if data == b'##': break fd.write(data) fd.close() else: print(data) def do_quit(self): self.sockfd.send(b'Q') self.sockfd.close() sys.exit("謝謝使用") # 創(chuàng)建客戶端網(wǎng)絡(luò) def main(): sockfd = socket() try: sockfd.connect(ADDR) except Exception as e: print(e) return ftp = FTPClient(sockfd) # 實例化對象 # 循環(huán)發(fā)送請求 while True: print("\n=========命令選項==========") print("**** list ****") print("**** get file ****") print("**** put file ****") print("**** quit ****") print("=============================") cmd = input("輸入命令:") if cmd.strip() == 'list': ftp.do_list() elif cmd[:3] == 'get': # get filename filename = cmd.strip().split(' ')[-1] ftp.do_get(filename) elif cmd[:3] == 'put': # put ../filename filename = cmd.strip().split(' ')[-1] ftp.do_put(filename) elif cmd.strip() == 'quit': ftp.do_quit() else: print("請輸入正確命令") if __name__ == "__main__": main()
定義:在內(nèi)存中數(shù)據(jù)交換的操作被定義為IO操作,IO------輸入輸出
內(nèi)存和磁盤進(jìn)行數(shù)據(jù)交換: 文件的讀寫 數(shù)據(jù)庫更新
內(nèi)存和終端數(shù)據(jù)交換 :input print sys.stdin sys.stdout sys.stderr
內(nèi)存和網(wǎng)絡(luò)數(shù)據(jù)的交換: 網(wǎng)絡(luò)連接 recv send recvfrom
IO密集型程序 : 程序執(zhí)行中有大量的IO操作,而較少的cpu運(yùn)算操作。消耗cpu較少,IO運(yùn)行時間長
CPU(計算)密集型程序:程序中存在大量的cpu運(yùn)算,IO操作相對較少,消耗cpu大。
IO分為:阻塞IO、非阻塞IO、IO多路復(fù)用、事件驅(qū)動IO、異步IO
阻塞IO
定義: 在執(zhí)行IO操作時如果執(zhí)行條件不滿足則阻塞。阻塞IO是IO的默認(rèn)形態(tài)。
效率: 阻塞IO是效率很低的一種IO。但是由于邏輯簡單所以是默認(rèn)IO行為。
阻塞情況:
因為某種執(zhí)行條件沒有滿足造成的函數(shù)阻塞 e.g. accept input recv
處理IO的時間較長產(chǎn)生的阻塞狀態(tài) e.g. 網(wǎng)絡(luò)傳輸, 大文件讀寫
非阻塞IO
定義 : 通過修改IO屬性行為, 使原本阻塞的IO變?yōu)榉亲枞臓顟B(tài)。
設(shè)置套接字為非阻塞IO
sockfd.setblocking(bool)
功能: 設(shè)置套接字為非阻塞IO
參數(shù): 默認(rèn)為True,表示套接字IO阻塞;設(shè)置為False則套接字IO變?yōu)榉亲枞?/p>
超時檢測 :設(shè)置一個最長阻塞時間,超過該時間后則不再阻塞等待。
sockfd.settimeout(sec)
功能:設(shè)置套接字的超時時間
參數(shù):設(shè)置的時間
定義 :通過一個監(jiān)測,可以同時監(jiān)控多個IO事件的行為。當(dāng)哪個IO事件可以執(zhí)行,即讓這個IO事件發(fā)生。
rs, ws, xs = select(rlist, wlist, xlist[, timeout]) 監(jiān)控IO事件,阻塞等待監(jiān)控的IO時間發(fā)生
參數(shù) :
rlist列表,存放(被動)等待處理的IO (接收)
wlist列表,存放主動處理的IO(發(fā)送)
xlist列表,存放出錯,希望去處理的IO(異常)
timeout 超時檢測
返回值:
rs列表rlist中準(zhǔn)備就緒的IO
ws列表wlist中準(zhǔn)備就緒的IO
xs列表xlist中準(zhǔn)備就緒的IO
select 實現(xiàn)tcp服務(wù)
1.將關(guān)注的IO放入對應(yīng)的監(jiān)控類別列表
2.通過select函數(shù)進(jìn)行監(jiān)控
3.遍歷select返回值列表,確定就緒IO事件
4.處理發(fā)生的IO事件
from socket import * from select import select # 創(chuàng)建一個監(jiān)聽套接字作為關(guān)注的IO s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('0.0.0.0',8888)) s.listen(3) # 設(shè)置關(guān)注列表 rlist = [s] wlist = [] xlist = [s] # 循環(huán)監(jiān)控IO while True: rs,ws,xs = select(rlist,wlist,xlist) # 遍歷三個返回列表,處理IO for r in rs: # 根據(jù)遍歷到IO的不同使用if分情況處理 if r is s: c,addr = r.accept() print("Connect from",addr) rlist.append(c) # 增加新的IO事件 # else為客戶端套接字就緒情況 else: data = r.recv(1024) # 客戶端退出 if not data: rlist.remove(r) # 從關(guān)注列表移除 r.close() continue # 繼續(xù)處理其他就緒IO print("Receive:",data.decode()) # r.send(b'OK') # 我們希望主動處理這個IO對象 wlist.append(r) for w in ws: w.send(b'OK') wlist.remove(w) # 使用后移除 for x in xs: pass
注意:
wlist中如果存在IO事件,則select立即返回給ws
處理IO過程中不要出現(xiàn)死循環(huán)占有服務(wù)端的情況
IO多路復(fù)用消耗資源較少,效率較高擴(kuò)展:
將整數(shù)轉(zhuǎn)換為二進(jìn)制, 按照二進(jìn)制位進(jìn)行運(yùn)算符操作
& 按位與 | 按位或 ^ 按位異或 << 左移 >> 右移
11 1011 14 1110
(11 & 14 1010) (11| 14 1111) (11^ 14 0101)
11 << 2 ===> 44 右側(cè)補(bǔ)0 14 >> 2 ===> 3 擠掉右側(cè)的數(shù)字
使用 :
在做底層硬件時操作寄存器
做標(biāo)志位的過濾
創(chuàng)建poll對象:p = select.poll()
注冊關(guān)注的IO事件:p.register(fd,event)
fd 要關(guān)注的IO
event 要關(guān)注的IO事件類型
常用類型:
POLLIN 讀IO事件(rlist)
POLLOUT 寫IO事件 (wlist)
POLLERR 異常IO (xlist)
POLLHUP 斷開連接
取消對IO的關(guān)注:p.unregister(fd)
參數(shù): IO對象或者IO對象的fileno
events = p.poll():
功能: 阻塞等待監(jiān)控的IO事件發(fā)生
返回值: 返回發(fā)生的IO事件
events是一個列表 [(fileno,evnet),(),()....]
每個元組為一個就緒IO,元組第一項是該IO的fileno,第二項為該IO就緒的事件類型
poll_server 步驟
1.創(chuàng)建套接字
2.將套接字register
3.創(chuàng)建查找字典,并維護(hù)
4.循環(huán)監(jiān)控IO發(fā)生
5.處理發(fā)生的IO
from socket import * from select import * # 創(chuàng)建套接字 s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('0.0.0.0',8888)) s.listen(3) # 創(chuàng)建poll對象關(guān)注s p = poll() # 建立查找字典,用于通過fileno查找IO對象 fdmap = {s.fileno():s} # 關(guān)注s p.register(s,POLLIN|POLLERR) # 循環(huán)監(jiān)控 while True: events = p.poll() # 循環(huán)遍歷發(fā)生的事件 fd-->fileno for fd,event in events: # 區(qū)分事件進(jìn)行處理 if fd == s.fileno(): c,addr = fdmap[fd].accept() print("Connect from",addr) # 添加新的關(guān)注IO p.register(c,POLLIN|POLLERR) fdmap[c.fileno()] = c # 維護(hù)字典 # 按位與判定是POLLIN就緒 elif event & POLLIN: data = fdmap[fd].recv(1024) if not data: p.unregister(fd) # 取消關(guān)注 fdmap[fd].close() del fdmap[fd] # 從字典中刪除 continue print("Receive:",data.decode()) fdmap[fd].send(b'OK')
1. 使用方法 : 基本與poll相同
生成對象改為 epoll()
將所有事件類型改為EPOLL類型
2. epoll特點
epoll 效率比select poll要高
epoll 監(jiān)控IO數(shù)量比select要多
epoll 的觸發(fā)方式比poll要多 (EPOLLET邊緣觸發(fā))
from socket import * from select import * # 創(chuàng)建套接字 s = socket() s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('0.0.0.0',8888)) s.listen(3) # 創(chuàng)建epoll對象關(guān)注s ep = epoll() # 建立查找字典,用于通過fileno查找IO對象 fdmap = {s.fileno():s} # 關(guān)注s ep.register(s,EPOLLIN|EPOLLERR) # 循環(huán)監(jiān)控 while True: events = ep.poll() # 循環(huán)遍歷發(fā)生的事件?。妫?->fileno for fd,event in events: print("親,你有IO需要處理哦") # 區(qū)分事件進(jìn)行處理 if fd == s.fileno(): c,addr = fdmap[fd].accept() print("Connect from",addr) # 添加新的關(guān)注IO # 將觸發(fā)方式變?yōu)檫吘売|發(fā) ep.register(c,EPOLLIN|EPOLLERR|EPOLLET) fdmap[c.fileno()] = c # 維護(hù)字典 # 按位與判定是EPOLLIN就緒 # elif event & EPOLLIN: # data = fdmap[fd].recv(1024) # if not data: # ep.unregister(fd) # 取消關(guān)注 # fdmap[fd].close() # del fdmap[fd] # 從字典中刪除 # continue # print("Receive:",data.decode()) # fdmap[fd].send(b'OK')
到此,相信大家對“python并發(fā)網(wǎng)絡(luò)通信模型有哪些”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。