溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

python如何使用socket實(shí)現(xiàn)TCP協(xié)議長連接框架

發(fā)布時(shí)間:2022-02-15 09:07:55 來源:億速云 閱讀:619 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了python如何使用socket實(shí)現(xiàn)TCP協(xié)議長連接框架,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

分析多了協(xié)議就會發(fā)現(xiàn),很多的應(yīng)用,特別是游戲類和IM類應(yīng)用,它們的協(xié)議會使用長連接的方式,來保持客戶端與服務(wù)器的聯(lián)系,這些長連接,通常是TCP承載的。

如果我們要模擬這個(gè)客戶端的行為,根據(jù)不同應(yīng)用服務(wù)器的實(shí)現(xiàn)情況,有些長連接不是必須的,但有些長連接,就必須去實(shí)現(xiàn)它。例如最近分析的某應(yīng)用,雖然它主要使用HTTP協(xié)議進(jìn)行交互,但它在TCP長連接中傳輸了一些必須的信息,如果不實(shí)現(xiàn)長連接,就會有很多信息無法處理。

在python中,很容易實(shí)現(xiàn)HTTP協(xié)議,當(dāng)然,也容易實(shí)現(xiàn)TCP協(xié)議,它的TCP實(shí)現(xiàn),使用socket庫就可以了,只是需要注意,TCP長連接中通常傳輸?shù)氖鞘M(jìn)制數(shù)據(jù),協(xié)議非標(biāo)準(zhǔn)的,需要自行根據(jù)協(xié)議分析結(jié)果來封裝數(shù)據(jù)格式。

這里以一個(gè)使用到TCP長連接的協(xié)議為樣例,來給出協(xié)議的TCP長連接框架,大家有需要可以參考實(shí)現(xiàn),當(dāng)然,代碼也是從樣例中摘出來的,并不是完整的。

TCP長連接框架,首先是外部的包裝,初始化一些參數(shù),例如長連接使用到的ip端口及socket套接字等:

self.longip='im.langren001.com'
        self.longport= 6656
        self.threadLock = threading.Lock()
        self.sockmain = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
        self.longlinktcpstart2()
        tlonglink = threading.Thread(target=lrsuser.longlinktcpth3,name='mainlink_'+ self.playinfo['uid'], args=(self,))
        tlonglink.start()
        self.threadinfo.append(tlonglink)

這個(gè)里面調(diào)用了兩個(gè)函數(shù),一個(gè)是longlinktcpstart2函數(shù),作用是建立socket連接,并對一些連接建立初始時(shí)的交互進(jìn)行實(shí)現(xiàn),另一個(gè)是longlinktcpth3函數(shù),是一個(gè)線程,實(shí)現(xiàn)對連接內(nèi)的數(shù)據(jù)進(jìn)行收發(fā)處理。一般來說,這兩個(gè)可以在一起實(shí)現(xiàn),但為了方便socket異常斷開的處理,分成了兩個(gè)函數(shù)。

 longlinktcpstart2的實(shí)現(xiàn)如下:

def longlinktcpstart2(self):
        server_address = (self.longip, int(self.longport))
        self.savelogs('longlinktcpstart2', 'Connecting to %s:%d.' % server_address)
        self.sockmain.connect(server_address)
        self.databuf = b''
        message = genbaseinfo.genalive()
        self.sockmain.sendall(message)
        message = genbaseinfo.genfirstdata()
        if len(message)==0:
            self.savelogs('longlinktcpstart2', 'genfirstdata error ')
            return False
        self.sockmain.sendall(message)
        self.longlinkcnt=2
        cnt = 0
        while (cnt < 2):
            try:
                buf = self.sockmain.recv(2048)
                sz = len(buf)
                self.savelogs('longlinktcpstart2', "recv data len "+str(sz) )
                if sz > 0:
                    self.databuf +=buf
                    self.dealdatabuf()
                    if cnt == 0:
                        alivemsg =  genbaseinfo.genalive()
                        self.sockmain.sendall(alivemsg)
                        self.savelogs('longlinktcpstart2', "sendalive")
                        regtime=int(round(time.time() * 1000))-random.randint(14400000,25200000)
                        regtime=regtime*1000
                        pcode = self.versionstr + '.0'
                        message =  genbaseinfo.genseconddata()
                        if len(message) == 0:
                            self.savelogs('longlinktcpstart2', 'genseconddata error ')
                            return False
                        self.sockmain.sendall(message)
                        self.longlinkcnt = self.longlinkcnt + 1
                    elif cnt == 1:
                        pcode = self.versionstr + '.0'
                        message =  genbaseinfo.genotherdata()
                        if len(message) == 0:
                            self.savelogs('longlinktcpstart2', 'genthirddata error ')
                            return False
                        self.sockmain.sendall(message)
                        self.longlinkcnt = self.longlinkcnt + 1
                    cnt = cnt + 1
                else:
                    self.savelogs('longlinktcpstart2', 'recv data alive')
            except:  # socket.error
                self.savelogs('longlinktcpstart2', 'socket error,do connect fail')
                return False
 
 
        return True

這里面的genbaseinfo 相關(guān)的函數(shù)可以忽略,是用來生成發(fā)送的消息數(shù)據(jù)的實(shí)現(xiàn),用自己的函數(shù)去替換即可。dealdatabuf函數(shù)是用來處理收到的消息數(shù)據(jù)實(shí)現(xiàn),這兩個(gè)都要根據(jù)具體的協(xié)議分析情況去實(shí)現(xiàn),注意,生成的用來發(fā)送的數(shù)據(jù)和接收到的需要處理的數(shù)據(jù),都需要按十六進(jìn)制處理,這里不做詳述。

線程longlinktcpth3是一個(gè)循環(huán),協(xié)議不退出,循環(huán)不結(jié)束,實(shí)現(xiàn)如下:

def longlinktcpth3(self):
        tmalive = 0;
        r_inputs = set()
        r_inputs.add(self.sockmain)
        w_inputs = set()
        w_inputs.add(self.sockmain)
        e_inputs = set()
        e_inputs.add(self.sockmain)
        tm=int(round(time.time()))
        self.savelogs('longlinktcpth3', 'enter' )
        while (self.quitflag==0):
            try:
                r_list, w_list, e_list = select.select(r_inputs, w_inputs, e_inputs, 1)
                for event in r_list:
                    try:
                        buf = event.recv(2048)
                        sz = len(buf)
                        self.savelogs('longlinktcpth3', "loop recv data len:"+ str(sz) )
                        if sz > 0:
                            self.databuf += buf
                            self.dealdatabuf()
                            alivemsg = genbaseinfo.genalive()
                            self.sockmain.sendall(alivemsg)
                            self.savelogs('longlinktcpth3', "sendalive")
                        else:
                            self.savelogs('longlinktcpth3', "遠(yuǎn)程斷開連接,do reconnect")
                            r_inputs.clear()
                            time.sleep(3)
                            self.sockmain = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                            self.longlinktcpstart2()
                            r_inputs = set()
                            r_inputs.add(self.sockmain)
                            w_inputs = set()
                            w_inputs.add(self.sockmain)
                            e_inputs = set()
                            e_inputs.add(self.sockmain)
                    except Exception as e:
                        self.savelogs('longlinktcpth3', str(e))
                self.threadLock.acquire()
                if (len(self.msglist) > 0):
                    msg = self.msglist.pop(0)
                    self.threadLock.release()
                    self.sockmain.sendall(msg)
                    self.savelogs('longlinktcpth3',"send a msg")
                else:
                    self.threadLock.release()
                tmnow=int(round(time.time()))
                if tmnow-tm>30:
 
 
                    message = genbaseinfo.genotherdata()
                    if len(message) == 0:
                        self.savelogs('longlinktcpth3', 'genalivedata error ')
                        return False
                    self.sockmain.sendall(message)
                    self.savelogs('longlinktcpth3', "send alivemsg"+str(self.longlinkcnt))
                    self.longlinkcnt = self.longlinkcnt + 1 #這個(gè)要一條連接統(tǒng)一,不能亂,回頭加鎖
                    tm=tmnow
                if len(w_list) > 0:  # 產(chǎn)生了可寫的事件,即連接完成
                    self.savelogs('longlinktcpth3',str(w_list))
                    w_inputs.clear()  # 當(dāng)連接完成之后,清除掉完成連接的socket
 
 
                if len(e_list) > 0:  # 產(chǎn)生了錯(cuò)誤的事件,即連接錯(cuò)誤
                    self.savelogs('longlinktcpth3', str(e_list))
                    e_inputs.clear()  # 當(dāng)連接有錯(cuò)誤發(fā)生時(shí),清除掉發(fā)生錯(cuò)誤的socket
            except OSError as e:
                self.savelogs('longlinktcpth3', 'socket error,do reconnect')
                time.sleep(3)
                self.sockmain = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.longlinktcpstart2()
                r_inputs = set()
                r_inputs.add(self.sockmain)
                w_inputs = set()
                w_inputs.add(self.sockmain)
                e_inputs = set()
                e_inputs.add(self.sockmain)
                
        self.savelogs('longlinktcpth3', 'leave')

由于這個(gè)代碼主要是在windows上使用,因此,longlinktcpth3線程采用了select來實(shí)現(xiàn),而沒有使用epoll。在循環(huán)中,對異常進(jìn)行了處理,如果發(fā)生異常,連接被斷開,則調(diào)用longlinktcpstart2重新連接,而不退出循環(huán),其余的和longlinktcpstart2里面一致。

由于TCP連接是流的概念,因此,需要對數(shù)據(jù)進(jìn)行緩存拼接,這就是上面代碼中databuf的作用,防止每次收到的數(shù)據(jù)不完整或者太多,方便后續(xù)的處理,這才是一個(gè)合格的碼農(nóng)的信仰的自我升華。

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“python如何使用socket實(shí)現(xiàn)TCP協(xié)議長連接框架”這篇文章對大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI