您好,登錄后才能下訂單哦!
?
目錄
socketserver模塊:... 1
編程接口:... 2
總結(jié),創(chuàng)建服務(wù)器步驟:... 4
例,實(shí)現(xiàn)EchoServer:... 4
例,改寫ChatServer:... 5
?
?
?
socket過于底層,編程雖有套路,但想要寫出健壯的代碼比較困難,所以很多語言都對socket底層API進(jìn)行封裝,py的封裝就是socketserver模塊,網(wǎng)絡(luò)服務(wù)編程框架,全球企業(yè)級快速開發(fā);
socketserver簡化了網(wǎng)絡(luò)服務(wù)器的編寫;
?
??????? +------------+
??????? | BaseServer |
??????? +------------+
????????????? |
????????????? v
??????? +-----------+??????? +------------------+
??????? | TCPServer |------->| UnixStreamServer |
??????? +-----------+??????? +------------------+
???? ?????????|
????????????? v
??????? +-----------+??????? +--------------------+
??????? | UDPServer |------->| UnixDatagramServer |
??????? +-----------+??????? +--------------------+
?
4個sync同步類:
TCPServer、UDPServer、UnixStreamServer、UnixDatagramServer;
很少用;
?
2個mixin類:
ForkingMixIn、ThreadingMixIn;
?
4個async異步類,生產(chǎn)中常用:
ForkingTCPServer(ForkingMixIn,TCPServer)、ForkingUDPServer(ForkingMixIn,UDPServer)?? #創(chuàng)建多進(jìn)程
ThreadingTCPServer(ThreadingMixIn,TCPServer)、ThreadingUDPServer(ThreadingMixIn,UDPServer) ??#創(chuàng)建多線程
?
注:
一般ThreadingTCPServer夠用;
如果并發(fā)很高可考慮用ForkingTCPServer;
ThreadingUDPServer甚至也很少用,盡管在LAN中,如果忙起來時接收到的包的順序是亂的;
?
?
class BaseServer:
??? def __init__(self, server_address, RequestHandlerClass):?? #服務(wù)器綁定的地址信息;用于處理請求,該類必須是BaseRequestHandler類的子類
?
??? def finish_request(self, request, client_address):?? #處理請求的方法
??????? """Finish one request by instantiating RequestHandlerClass."""
??????? self.RequestHandlerClass(request, client_address, self)?? #實(shí)例化,RequesthandlerClass的構(gòu)造
?
查看源碼,寫框架的思想:
class BaseRequestHandler:?? #和用戶連接的用戶請求處理類,server實(shí)例接收用戶請求后,最后會實(shí)例化這個類;它會一次調(diào)用三個函數(shù)setup()(每一個連接初始化)、handler()(每一次請求處理,必須覆蓋)、finish()(每一個連接清理),子類可覆蓋
??? def __init__(self, request, client_address, server):?? #初始化時送入3個構(gòu)造參數(shù),request、client_address、server(TCPServer),以后可在BaseRequestHandler類的實(shí)例上使用self.request(和client連接的socket對象)、self.cleint_address(是客戶端地址)、self.server(是TCPServer本身)
??????? self.request = request
??????? self.client_address = client_address
??????? self.server = server
??????? self.setup()
??????? try:
??????????? self.handle()
??????? finally:
??????????? self.finish()
?
??? def setup(self):?? #每一個連接初始化,初始化工作,如ChatServer中維護(hù)的數(shù)據(jù)結(jié)構(gòu)放到此段;實(shí)現(xiàn)了這三個方法,只不過是空操作,而raise NotImplementedError稱為抽象,不實(shí)現(xiàn)
??????? pass
?
??? def handle(self):?? #每一次請求處理,必須覆蓋;handle()和sock.accept()對應(yīng),用戶連接請求過來后,建立連接并生成一個socket對象(保存在self.request中)和客戶端地址(保存在self.client_address中),之后的操作就和socket編程一樣了
??????? pass
?
??? def finish(self):?? #每一個連接清理,清理工作
??????? pass
注:
setup()和finish()只執(zhí)行一次;
handler()在不加鎖情況下,也是執(zhí)行一次;
?
例:
class MyHandler(socketserver.BaseRequestHandler):?? #右鍵MyHandler,Generate-->Overwrite Methods,可快速生成要覆蓋的方法
??? def handle(self):
??????? super().handle()?? #此句可不寫,因?yàn)楦割愔械?/span>handler()為空操作;但如果是StreamRequestHandler則必須要寫,該類中實(shí)現(xiàn)了handler()方法
??????? print(self.request, self.client_address, self.server)
??????? print('{} handler'.format(self.__class__))
??????? print(self.__dict__)
??????? print(type(self).__dict__)
??????? print(self.__class__.__bases__[0].__dict__)
??????? print(threading.enumerate(), threading.current_thread())
??????? # pass?? #TODO? ?#提醒自己還沒寫完
??????? print('come')
??????? for i in range(3):?? #client和server端長時間連接,在handler里循環(huán);分布式服務(wù)之間需傳遞心跳包(傳遞事務(wù)、節(jié)點(diǎn)信息等),服務(wù)之間要長連接,不能斷;數(shù)據(jù)庫連接池不應(yīng)用長連接,傳完數(shù)據(jù)就可斷開,有很多連接等著連DB
??????????? data = self.request.recv(1024)
??????????? print(data)
?
addr = ('127.0.0.1', 9998)
server = socketserver.ThreadingTCPServer(addr, MyHandler)?? #用多client連接測
# server = socketserver.TCPServer(addr, MyHandler)?? #同步,等前一個連接斷開后,才能接收并處理下一個連接的請求
server.serve_forever()?? #啟動大循環(huán),類似while
?
server.shutdown()
server.server_close()?? #建議關(guān)閉連接前先server.shutdown()
輸出:
<socket.socket fd=232, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9998), raddr=('127.0.0.1', 7576)> ('127.0.0.1', 7576) <socketserver.ThreadingTCPServer object at 0x0000000000B656A0>
<class '__main__.MyHandler'> handler
{'request': <socket.socket fd=232, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9998), raddr=('127.0.0.1', 7576)>, 'client_address': ('127.0.0.1', 7576), 'server': <socketserver.ThreadingTCPServer object at 0x0000000000B656A0>}
{'__doc__': None, '__module__': '__main__', 'handle': <function MyHandler.handle at 0x0000000001231C80>}
(<class 'socketserver.BaseRequestHandler'>,)
{'setup': <function BaseRequestHandler.setup at 0x0000000001477A60>, '__init__': <function BaseRequestHandler.__init__ at 0x00000000014779D8>, '__dict__': <attribute '__dict__' of 'BaseRequestHandler' objects>, '__module__': 'socketserver', '__doc__': 'Base class for request handler classes.\n\n??? This class is instantiated for each request to be handled.? The\n??? constructor sets the instance variables request, client_address\n??? and server, and then calls the handle() method.? To implement a\n??? specific service, all you need to do is to derive a class which\n? ??defines a handle() method.\n\n??? The handle() method can find the request as self.request, the\n??? client address as self.client_address, and the server (in case it\n??? needs access to per-server information) as self.server.? Since a\n??? separate instance is created for each request, the handle() method\n??? can define other arbitrary instance variables.\n\n??? ', 'handle': <function BaseRequestHandler.handle at 0x0000000001477AE8>, '__weakref__': <attribute '__weakref__' of 'BaseRequestHandler' objects>, 'finish': <function BaseRequestHandler.finish at 0x0000000001477B70>}
[<_MainThread(MainThread, started 4136)>, <Thread(Thread-1, started 4372)>] <Thread(Thread-1, started 4372)>
come
?
1、class MyHandler(socketserver.BaseRequestHandler):,通過對BaseRequestHandler類進(jìn)行子類化并覆蓋其handle()方法,來創(chuàng)建請求處理程序類,此方法處理傳入請求;
2、server=socketserver.ThreadingTCPServer(addr,MyHandler),必須實(shí)例化一個服務(wù)器類,并向其傳入服務(wù)器的地址和請求處理程序類;
3、server.serve_forever()或server.handle_request(),調(diào)用服務(wù)器對象的serve_forever()(一直啟動)或server.handle_request()(一次性的)方法;
4、server.shutdown()、server.close(),調(diào)用server.close()(關(guān)閉套接字)前先server.shutdown()等待停止server.serve_forever();
?
為每一個連接提供RequestHandlerClass類實(shí)例,一次調(diào)用setup()、handler()、finish()方法,且使用了try...finally結(jié)構(gòu)(查看BaseRequestHandler源碼)保證finish()方法一定能被調(diào)用,這些方法一次執(zhí)行完成;
如果想維持這個連接與客戶端通信,需要在handler()中使用循環(huán);
socketserver模塊提供不同的類,但編程接口是一樣的,即使是多進(jìn)程、多線程的類也是一樣,大大減少了編程的難度;
?
?
client發(fā)來什么,就返回什么消息;
class EchoHandler(socketserver.BaseRequestHandler):
??? def setup(self):
??????? super().setup()
??????? self.event = threading.Event()
?
??? def handle(self):
??????? super().handle()
??????? while not self.event.is_set():
??????????? data = self.request.recv(1024)
??????????? data = data.decode()
??????????? msg = 'ack: {} {}'.format(self.client_address, data)
??????????? msg = msg.encode()
??????????? self.request.send(msg)
??????? print('end')
?
??? def finish(self):
??????? super().finish()
??????? self.event.set()
?
addr = ('127.0.0.1', 9998)
server = socketserver.ThreadingTCPServer(addr, EchoHandler)
# server.serve_forever()
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
?
# server.shutdown()
# server.server_close()
try:
??? while True:
??????? cmd = input('>>> ')
??????? if cmd.strip() == 'quit':?? #只有在client都斷開,與server端沒有連接時才正常退出
??????????? break
except Exception as e:
??? print(e)
except KeyboardInterrupt:
??? print('exit')
finally:
??? server.shutdown()
??? server.server_close()
?
?
如果使用文件處理,使用StreamRequestHandler;
可用心跳機(jī)制;
?
class ChatHandler(socketserver.BaseRequestHandler):
??? clients = {}
??? def setup(self):
??????? super().setup()
??????? self.event = threading.Event()
??????? print(self.client_address, threading.current_thread(), self.clients)
?
??? def handle(self):
??????? super().handle()
??????? while not self.event.is_set():
??????????? try:?? #緩沖區(qū)異常、連接異常最好自己捕獲到,雖然父類中有try,但最好自己捕獲
??????????????? data = self.request.recv(1024).decode().strip()
???????????????????????????????????? if len(data) == 0:?? #同if not data,解決client主動斷開后產(chǎn)生的異常,20180901追加尚未測試
?????????????????????????????????????????????? raise BrokenPipeError('client broken')
??????????? except Exception as e:
??????????????? logging.info(e)
??????????????? data = 'quit'?? #技巧,某個連接一旦有問題,會有各種異常,此處直接斷開
??????????? logging.info(data)
?
??????????? if data == 'quit':
??????????????? break
?
??? ????????self.clients[self.client_address] = self.request
??????????? msg = 'ack: {}'.format(data)
??????????? for c in self.clients.values():
??????????????? c.send(msg.encode())
?
??? def finish(self):
??????? super().finish()
??????? self.clients.pop(self.client_address)
??????? self.event.set()
?
addr = ('127.0.0.1', 9998)
server = socketserver.ThreadingTCPServer(addr, ChatHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
?
myutils.show_threads()?? #在主線程中就可,沒必要放到工作線程中
try:
??? while True:
??????? cmd = input('>>> ').strip()
??????? if cmd == 'quit':
??????????? break
except Exception as e:
??? print(e)
except KeyboardInterrupt:
??? print('exit')
finally:
??? server.shutdown()
??? server.server_close()
輸出:
>>> [<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <_MainThread(MainThread, started 9820)>]
('127.0.0.1', 8000) <Thread(Thread-2, started 4008)> {}
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>]
('127.0.0.1', 8003) <Thread(Thread-3, started 9456)> {}
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>, <Thread(Thread-3, started 9456)>]
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>, <Thread(Thread-3, started 9456)>]
2018-08-24-09:33:36?????? Thread info: 9456 Thread-3 test
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>, <Thread(Thread-3, started 9456)>]
2018-08-24-09:33:41?????? Thread info: 4008 Thread-2 test
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>, <Thread(Thread-3, started 9456)>]
2018-08-24-09:33:48?????? Thread info: 9456 Thread-3 test2
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>, <Thread(Thread-3, started 9456)>]
2018-08-24-09:33:51?????? Thread info: 4008 Thread-2 test1
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <Thread(Thread-2, started 4008)>, <_MainThread(MainThread, started 9820)>, <Thread(Thread-3, started 9456)>]
2018-08-24-09:33:53?????? Thread info: 4008 Thread-2
2018-08-24-09:33:53?????? Thread info: 4008 Thread-2 [WinError 10053] 您的主機(jī)中的軟件中止了一個已建立的連接。
2018-08-24-09:33:53?????? Thread info: 4008 Thread-2 quit
2018-08-24-09:33:55?????? Thread info: 9456 Thread-3
2018-08-24-09:33:55?????? Thread info: 9456 Thread-3 [WinError 10053] 您的主機(jī)中的軟件中止了一個已建立的連接。
2018-08-24-09:33:55?????? Thread info: 9456 Thread-3 quit
[<Thread(Thread-1, started daemon 9552)>, <Thread(show_threads, started daemon 9644)>, <_MainThread(MainThread, started 9820)>]
quit
?
?
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。