溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么使用Go建開發(fā)高負(fù)載WebSocket服務(wù)器

發(fā)布時間:2021-11-15 15:16:05 來源:億速云 閱讀:224 作者:iii 欄目:web開發(fā)

這篇文章主要介紹“怎么使用Go建開發(fā)高負(fù)載WebSocket服務(wù)器”,在日常操作中,相信很多人在怎么使用Go建開發(fā)高負(fù)載WebSocket服務(wù)器問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么使用Go建開發(fā)高負(fù)載WebSocket服務(wù)器”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

實現(xiàn)方式

讓我們看看如何使用Go函數(shù)實現(xiàn)服務(wù)器的某些部分,而無需任何優(yōu)化。

在進(jìn)行net/http ,我們來談?wù)勎覀內(nèi)绾伟l(fā)送和接收數(shù)據(jù)。 站在WebSocket協(xié)議(例如JSON對象) 之上的數(shù)據(jù)在下文中將被稱為分組 。

我們開始實現(xiàn)包含通過WebSocket連接發(fā)送和接收這些數(shù)據(jù)包的Channel結(jié)構(gòu)。

channel 結(jié)構(gòu)

// Packet represents application level data. type Packet struct {     ... }  // Channel wraps user connection. type Channel struct {     conn net.Conn    // WebSocket connection.     send chan Packet // Outgoing packets queue. }  func NewChannel(conn net.Conn) *Channel {     c := &Channel{         conn: conn,         send: make(chan Packet, N),     }      go c.reader()     go c.writer()      return c }

注意這里有reader和writer連個goroutines。 每個goroutine都需要自己的內(nèi)存棧, 根據(jù)操作系統(tǒng)和Go版本可能具有2到8  KB的初始大小。

在300萬個在線連接的時候,我們將需要24 GB的內(nèi)存 (堆棧為4 KB)用于維持所有連接。  這還沒有計算為Channel結(jié)構(gòu)分配的內(nèi)存,傳出的數(shù)據(jù)包c(diǎn)h.send和其他內(nèi)部字段消耗的內(nèi)存。

I/O goroutines

我們來看看“reader”的實現(xiàn):

func (c *Channel) reader() {     // We make a buffered read to reduce read syscalls.     buf := bufio.NewReader(c.conn)      for {         pkt, _ := readPacket(buf)         c.handle(pkt)     } }

這里我們使用bufio.Reader來減少read() syscalls的數(shù)量,并讀取與buf緩沖區(qū)大小一樣的數(shù)量。 在***循環(huán)中,我們期待新數(shù)據(jù)的到來。  請記?。?預(yù)計新數(shù)據(jù)將會來臨。 我們稍后會回來。

我們將離開傳入數(shù)據(jù)包的解析和處理,因為對我們將要討論的優(yōu)化不重要。 但是, buf現(xiàn)在值得我們注意:默認(rèn)情況下,它是4 KB,這意味著我們需要另外12  GB內(nèi)存。 “writer”有類似的情況:

func (c *Channel) writer() {     // We make buffered write to reduce write syscalls.      buf := bufio.NewWriter(c.conn)      for pkt := range c.send {         _ := writePacket(buf, pkt)         buf.Flush()     } }

我們遍歷c.send ,并將它們寫入緩沖區(qū)。細(xì)心讀者已經(jīng)猜到的,我們的300萬個連接還將消耗12 GB的內(nèi)存。

HTTP

我們已經(jīng)有一個簡單的Channel實現(xiàn),現(xiàn)在我們需要一個WebSocket連接才能使用。

注意:如果您不知道WebSocket如何工作??蛻舳送ㄟ^稱為升級的特殊HTTP機(jī)制切換到WebSocket協(xié)議。  在成功處理升級請求后,服務(wù)器和客戶端使用TCP連接來交換二進(jìn)制WebSocket幀。 這是連接中的框架結(jié)構(gòu)的描述。

import (     "net/http"     "some/websocket" )  http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {     conn, _ := websocket.Upgrade(r, w)     ch := NewChannel(conn)     //... })

請注意, http.ResponseWriter為bufio.Reader和bufio.Writer (使用4  KB緩沖區(qū))進(jìn)行內(nèi)存分配,用于*http.Request初始化和進(jìn)一步的響應(yīng)寫入。

無論使用什么WebSocket庫,在成功響應(yīng)升級請求后, 服務(wù)器在responseWriter.Hijack()調(diào)用之后,連同TCP連接一起接收  I/O緩沖區(qū)。

提示:在某些情況下, go:linkname 可用于 通過調(diào)用 net/http.putBufio{Reader,Writer} 將緩沖區(qū)返回到  net/http 內(nèi) 的 sync.Pool 。

因此,我們需要另外24 GB的內(nèi)存來維持300萬個鏈接。

所以,我們的程序即使什么都沒做,也需要72G內(nèi)存。

優(yōu)化

我們來回顧介紹部分中談到的內(nèi)容,并記住用戶連接的行為。 切換到WebSocket之后,客戶端發(fā)送一個包含相關(guān)事件的數(shù)據(jù)包,換句話說就是訂閱事件。  然后(不考慮諸如ping/pong等技術(shù)信息),客戶端可能在整個連接壽***不發(fā)送任何其他信息。

連接壽命可能是幾秒到幾天。

所以在最多的時候,我們的Channel.reader()和Channel.writer()正在等待接收或發(fā)送數(shù)據(jù)的處理。 每個都有4  KB的I/O緩沖區(qū)。

現(xiàn)在很明顯,某些事情可以做得更好,不是嗎?

Netpoll

你還記得bufio.Reader.Read()內(nèi)部,Channel.reader()實現(xiàn)了在沒有新數(shù)據(jù)的時候conn.read()會被鎖。如果連接中有數(shù)據(jù),Go運(yùn)行時“喚醒”我們的goroutine并允許它讀取下一個數(shù)據(jù)包。  之后,goroutine再次鎖定,期待新的數(shù)據(jù)。 讓我們看看Go運(yùn)行時如何理解goroutine必須被“喚醒”。 如果我們看看conn.Read()實現(xiàn)  ,我們將在其中看到net.netFD.Read()調(diào)用 :

// net/fd_unix.go  func (fd *netFD) Read(p []byte) (n int, err error) {     //...     for {         n, err = syscall.Read(fd.sysfd, p)         if err != nil {             n = 0             if err == syscall.EAGAIN {                 if err = fd.pd.waitRead(); err == nil {                     continue                 }             }         }         //...         break     }     //... }

Go在非阻塞模式下使用套接字。 EAGAIN表示,套接字中沒有數(shù)據(jù),并且在從空套接字讀取時不會被鎖定,操作系統(tǒng)將控制權(quán)返還給我們。

我們從連接文件描述符中看到一個read()系統(tǒng)調(diào)用。 如果讀取返回EAGAIN錯誤 ,則運(yùn)行時會使pollDesc.waitRead()調(diào)用 :

// net/fd_poll_runtime.go  func (pd *pollDesc) waitRead() error {    return pd.wait('r') }  func (pd *pollDesc) wait(mode int) error {    res := runtime_pollWait(pd.runtimeCtx, mode)    //... }

如果我們深入挖掘 ,我們將看到netpoll是使用Linux中的epoll和BSD中的kqueue來實現(xiàn)的。 為什么不使用相同的方法來進(jìn)行連接?  我們可以分配一個讀緩沖區(qū),只有在真正有必要時才使用goroutine:當(dāng)套接字中有真實可讀的數(shù)據(jù)時。

在github.com/golang/go上, 導(dǎo)出netpoll函數(shù)有問題 。

擺脫goroutines

假設(shè)我們有Go的netpoll實現(xiàn) 。 現(xiàn)在我們可以避免使用內(nèi)部緩沖區(qū)啟動Channel.reader()  goroutine,并在連接中訂閱可讀數(shù)據(jù)的事件:

ch := NewChannel(conn)  // Make conn to be observed by netpoll instance. poller.Start(conn, netpoll.EventRead, func() {     // We spawn goroutine here to prevent poller wait loop     // to become locked during receiving packet from ch.     go Receive(ch) })  // Receive reads a packet from conn and handles it somehow. func (ch *Channel) Receive() {     buf := bufio.NewReader(ch.conn)     pkt := readPacket(buf)     c.handle(pkt) }

使用Channel.writer()更容易,因為只有當(dāng)我們要發(fā)送數(shù)據(jù)包時,我們才能運(yùn)行g(shù)oroutine并分配緩沖區(qū):

func (ch *Channel) Send(p Packet) {     if c.noWriterYet() {         go ch.writer()     }     ch.send <- p }

請注意,當(dāng)操作系統(tǒng)在 write() 系統(tǒng)調(diào)用時返回 EAGAIN 時,我們不處理這種情況 。 對于這種情況,我們傾向于Go運(yùn)行時那樣處理。  如果需要,它可以以相同的方式來處理。

從ch.send (一個或幾個)讀出傳出的數(shù)據(jù)包后,writer將完成其操作并釋放goroutine棧和發(fā)送緩沖區(qū)。

***! 通過擺脫兩個連續(xù)運(yùn)行的goroutine中的堆棧和I/O緩沖區(qū),我們節(jié)省了48 GB 。

資源控制

大量的連接不僅涉及高內(nèi)存消耗。  在開發(fā)服務(wù)器時,我們會經(jīng)歷重復(fù)的競爭條件和死鎖,常常是所謂的自動DDoS,這種情況是當(dāng)應(yīng)用程序客戶端肆意嘗試連接到服務(wù)器,從而破壞服務(wù)器。

例如,如果由于某些原因我們突然無法處理ping/pong消息,但是空閑連接的處理程序會關(guān)閉這樣的連接(假設(shè)連接斷開,因此沒有提供數(shù)據(jù)),客戶端會不斷嘗試連接,而不是等待事件。

如果鎖定或超載的服務(wù)器剛剛停止接受新連接,并且負(fù)載均衡器(例如,nginx)將請求都傳遞給下一個服務(wù)器實例,那壓力將是巨大的。

此外,無論服務(wù)器負(fù)載如何,如果所有客戶端突然想要以任何原因發(fā)送數(shù)據(jù)包(大概是由于錯誤原因),則先前節(jié)省的48  GB將再次使用,因為我們將實際恢復(fù)到初始狀態(tài)goroutine和并對每個連接分配緩沖區(qū)。

Goroutine池

我們可以使用goroutine池來限制同時處理的數(shù)據(jù)包數(shù)量。 這是一個go routine池的簡單實現(xiàn):

package gopool  func New(size int) *Pool {     return &Pool{         work: make(chan func()),         sem:  make(chan struct{}, size),     } }  func (p *Pool) Schedule(task func()) error {     select {     case p.work <- task:     case p.sem <- struct{}{}:         go p.worker(task)     } }  func (p *Pool) worker(task func()) {     defer func() { <-p.sem }     for {         task()         task = <-p.work     } }

現(xiàn)在我們的netpoll代碼如下:

pool := gopool.New(128)  poller.Start(conn, netpoll.EventRead, func() {     // We will block poller wait loop when     // all pool workers are busy.     pool.Schedule(func() {         Receive(ch)     }) })

所以現(xiàn)在我們讀取數(shù)據(jù)包可以在池中使用了空閑的goroutine。

同樣,我們將更改Send() :

pool := gopool.New(128)  func (ch *Channel) Send(p Packet) {     if c.noWriterYet() {         pool.Schedule(ch.writer)     }     ch.send <- p }

而不是go ch.writer() ,我們想寫一個重用的goroutine。 因此,對于N goroutines池,我們可以保證在N請求同時處理并且到達(dá)N  + 1我們不會分配N + 1緩沖區(qū)進(jìn)行讀取。 goroutine池還允許我們限制新連接的Accept()和Upgrade()  ,并避免大多數(shù)情況下被DDoS打垮。

零拷貝升級

讓我們從WebSocket協(xié)議中偏離一點(diǎn)。 如前所述,客戶端使用HTTP升級請求切換到WebSocket協(xié)議。 協(xié)議是樣子:

GET /ws HTTP/1.1 Host: mail.ru Connection: Upgrade Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== Sec-Websocket-Version: 13 Upgrade: websocket  HTTP/1.1 101 Switching Protocols Connection: Upgrade Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= Upgrade: websocket

也就是說,在我們的例子中,我們需要HTTP請求和header才能切換到WebSocket協(xié)議。  這個知識點(diǎn)和http.Request的內(nèi)部實現(xiàn)表明我們可以做優(yōu)化。我們會在處理HTTP請求時拋棄不必要的內(nèi)存分配和復(fù)制,并放棄標(biāo)準(zhǔn)的net/http服務(wù)器。

例如, http.Request 包含一個具有相同名稱的頭文件類型的字段,它通過將數(shù)據(jù)從連接復(fù)制到值字符串而無條件填充所有請求頭。  想像一下這個字段中可以保留多少額外的數(shù)據(jù),例如大型Cookie頭。

但是要做什么呢?

WebSocket實現(xiàn)

不幸的是,在我們的服務(wù)器優(yōu)化時存在的所有庫都允許我們對標(biāo)準(zhǔn)的net/http服務(wù)器進(jìn)行升級。 此外,所有庫都不能使用所有上述讀寫優(yōu)化。  為使這些優(yōu)化能夠正常工作,我們必須使用一個相當(dāng)?shù)图墑e的API來處理WebSocket。 要重用緩沖區(qū),我們需要procotol函數(shù)看起來像這樣:

func ReadFrame(io.Reader) (Frame, error)  func WriteFrame(io.Writer, Frame) error

如果我們有一個這樣的API的庫,我們可以從連接中讀取數(shù)據(jù)包,如下所示(數(shù)據(jù)包寫入看起來差不多):

// getReadBuf, putReadBuf are intended to  // reuse *bufio.Reader (with sync.Pool for example). func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader)  // readPacket must be called when data could be read from conn. func readPacket(conn io.Reader) error {     buf := getReadBuf()     defer putReadBuf(buf)      buf.Reset(conn)     frame, _ := ReadFrame(buf)     parsePacket(frame.Payload)     //... }

簡而言之,現(xiàn)在是制作我們自己庫的時候了。

github.com/gobwas/ws

為了避免將協(xié)議操作邏輯強(qiáng)加給用戶,我們編寫了WS庫。  所有讀寫方法都接受標(biāo)準(zhǔn)的io.Reader和io.Writer接口,可以使用或不使用緩沖或任何其他I/O包裝器。

除了來自標(biāo)準(zhǔn)net/http升級請求之外, ws支持零拷貝升級 ,升級請求的處理和切換到WebSocket,而無需內(nèi)存分配或復(fù)制。  ws.Upgrade()接受io.ReadWriter ( net.Conn實現(xiàn)了這個接口)。  換句話說,我們可以使用標(biāo)準(zhǔn)的net.Listen()并將接收到的連接從ln.Accept()立即傳遞給ws.Upgrade() 。  該庫可以復(fù)制任何請求數(shù)據(jù)以供將來在應(yīng)用程序中使用(例如, Cookie以驗證會話)。

以下是升級請求處理的基準(zhǔn) :標(biāo)準(zhǔn)net/http服務(wù)器與net.Listen()加零拷貝升級:

BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op  BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op

切換到ws和零拷貝升級節(jié)省了另外24 GB內(nèi)存 - 這是由net/http處理程序請求處理時為I/O緩沖區(qū)分配的空間。

概要

讓我們結(jié)合代碼告訴你我們做的優(yōu)化。

  • 讀取內(nèi)部緩沖區(qū)的goroutine是非常昂貴的。 解決方案 :netpoll(epoll,kqueue); 重用緩沖區(qū)。

  • 寫入內(nèi)部緩沖區(qū)的goroutine是非常昂貴的。 解決方案 :必要時啟動goroutine; 重用緩沖區(qū)。

  • DDOS,netpoll將無法工作。 解決方案 :重新使用數(shù)量限制的goroutines。

  • net/http不是處理升級到WebSocket的最快方法。 解決方案 :在連接上使用零拷貝升級。

這就是服務(wù)器代碼的樣子:

import (     "net"     "github.com/gobwas/ws" )  ln, _ := net.Listen("tcp", ":8080")  for {     // Try to accept incoming connection inside free pool worker.     // If there no free workers for 1ms, do not accept anything and try later.     // This will help us to prevent many self-ddos or out of resource limit cases.     err := pool.ScheduleTimeout(time.Millisecond, func() {         conn := ln.Accept()         _ = ws.Upgrade(conn)          // Wrap WebSocket connection with our Channel struct.         // This will help us to handle/send our app's packets.         ch := NewChannel(conn)          // Wait for incoming bytes from connection.         poller.Start(conn, netpoll.EventRead, func() {             // Do not cross the resource limits.             pool.Schedule(func() {                 // Read and handle incoming packet(s).                 ch.Recevie()             })         })     })     if err != nil {            time.Sleep(time.Millisecond)     } }

到此,關(guān)于“怎么使用Go建開發(fā)高負(fù)載WebSocket服務(wù)器”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI