溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

golang實(shí)現(xiàn)WebSocket推送服務(wù)的方法

發(fā)布時(shí)間:2020-06-24 11:22:29 來源:億速云 閱讀:251 作者:Leah 欄目:編程語言

這篇文章運(yùn)用簡單易懂的例子給大家介紹golang實(shí)現(xiàn)WebSocket推送服務(wù)的方法,代碼非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

推送服務(wù)實(shí)現(xiàn)

基本原理

server 啟動(dòng)以后會(huì)注冊(cè)兩個(gè) Handler。

websocketHandler 用于提供瀏覽器端發(fā)送 Upgrade 請(qǐng)求并升級(jí)為 WebSocket 連接。

pushHandler 用于提供外部推送端發(fā)送推送數(shù)據(jù)的請(qǐng)求。

瀏覽器首先連接 websocketHandler (默認(rèn)地址為 ws://ip:port/ws)升級(jí)請(qǐng)求為 WebSocket 連接,當(dāng)連接建立之后需要發(fā)送注冊(cè)信息進(jìn)行注冊(cè)。這里注冊(cè)信息中包含一個(gè) token 信息。

server 會(huì)對(duì)提供的 token 進(jìn)行驗(yàn)證并獲取到相應(yīng)的 userId(通常來說,一個(gè) userId 可能同時(shí)關(guān)聯(lián)許多 token),并保存維護(hù)好 token, userId 和 conn(連接)之間的關(guān)系。

推送端發(fā)送推送數(shù)據(jù)的請(qǐng)求到 pushHandler(默認(rèn)地址為 ws://ip:port/push),請(qǐng)求中包含了 userId 字段和 message 字段。server 會(huì)根據(jù) userId 獲取到所有此時(shí)連接到該 server 的 conn,然后將 message 一一進(jìn)行推送。

由于推送服務(wù)的實(shí)時(shí)性,推送的數(shù)據(jù)并沒有也不需要進(jìn)行緩存。

代碼詳解

我在此處會(huì)稍微講述一下代碼的基本構(gòu)成,也順便說說 Go 語言中一些常用的寫法和模式(本人也是從其他語言轉(zhuǎn)向 Go 語言,畢竟 Go 語言也相當(dāng)年輕。所以有建議的話,敬請(qǐng)?zhí)岢?。)?/p>

由于Go語言的發(fā)明人和一些主要維護(hù)者大都來自于 C/C++ 語言,所以 Go 語言的代碼也更偏向于 C/C++ 系。

首先先看一下 Server 的結(jié)構(gòu):

// Server defines parameters for running websocket server.
type Server struct {
    // Address for server to listen on
    Addr string

    // Path for websocket request, default "/ws".
    WSPath string

    // Path for push message, default "/push".
    PushPath string

    // Upgrader is for upgrade connection to websocket connection using
    // "github.com/gorilla/websocket".
    //
    // If Upgrader is nil, default upgrader will be used. Default upgrader is
    // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always
    // returns true.
    Upgrader *websocket.Upgrader

    // Check token if it's valid and return userID. If token is valid, userID
    // must be returned and ok should be true. Otherwise ok should be false.
    AuthToken func(token string) (userID string, ok bool)

    // Authorize push request. Message will be sent if it returns true,
    // otherwise the request will be discarded. Default nil and push request
    // will always be accepted.
    PushAuth func(r *http.Request) bool

    wh *websocketHandler
    ph *pushHandler
}

這里說一下 Upgrader *websocket.Upgrader,這是 gorilla/websocket 包的對(duì)象,它用來升級(jí) HTTP 請(qǐng)求。

如果一個(gè)結(jié)構(gòu)體參數(shù)過多,通常不建議直接初始化,而是使用它提供的 New 方法。這里是:

// NewServer creates a new Server.func NewServer(addr string) *Server {    return &Server{
        Addr:     addr,
        WSPath:   serverDefaultWSPath,
        PushPath: serverDefaultPushPath,
    }
}

這也是 Go 語言對(duì)外提供初始化方法的一種常見用法。

然后 Server 使用 ListenAndServe 方法啟動(dòng)并監(jiān)聽端口,與 http 包的使用類似:

// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {
    b := &binder{
        userID2EventConnMap: make(map[string]*[]eventConn),
        connID2UserIDMap:    make(map[string]string),
    }

    // websocket request handler
    wh := websocketHandler{
        upgrader: defaultUpgrader,
        binder:   b,
    }
    if s.Upgrader != nil {
        wh.upgrader = s.Upgrader
    }
    if s.AuthToken != nil {
        wh.calcUserIDFunc = s.AuthToken
    }
    s.wh = &wh
    http.Handle(s.WSPath, s.wh)

    // push request handler
    ph := pushHandler{
        binder: b,
    }
    if s.PushAuth != nil {
        ph.authFunc = s.PushAuth
    }
    s.ph = &ph
    http.Handle(s.PushPath, s.ph)

    return http.ListenAndServe(s.Addr, nil)
}

這里我們生成了兩個(gè) Handler,分別為 websocketHandler 和 pushHandler。websocketHandler 負(fù)責(zé)與瀏覽器建立連接并傳輸數(shù)據(jù),而 pushHandler 則處理推送端的請(qǐng)求。

可以看到,這里兩個(gè) Handler 都封裝了一個(gè) binder 對(duì)象。這個(gè) binder 用于維護(hù) token <-> userID <-> Conn 的關(guān)系:

// binder is defined to store the relation of userID and eventConn
type binder struct {
    mu sync.RWMutex

    // map stores key: userID and value of related slice of eventConn
    userID2EventConnMap map[string]*[]eventConn

    // map stores key: connID and value: userID
    connID2UserIDMap map[string]string
}

websocketHandler

具體看一下 websocketHandler 的實(shí)現(xiàn)。

// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {
    // upgrader is used to upgrade request.
    upgrader *websocket.Upgrader

    // binder stores relations about websocket connection and userID.
    binder *binder

    // calcUserIDFunc defines to calculate userID by token. The userID will
    // be equal to token if this function is nil.
    calcUserIDFunc func(token string) (userID string, ok bool)
}

很簡單的結(jié)構(gòu)。websocketHandler 實(shí)現(xiàn)了 http.Handler 接口:

// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    wsConn, err := wh.upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    defer wsConn.Close()

    // handle Websocket request
    conn := NewConn(wsConn)
    conn.AfterReadFunc = func(messageType int, r io.Reader) {
        var rm RegisterMessage
        decoder := json.NewDecoder(r)
        if err := decoder.Decode(&rm); err != nil {
            return
        }

        // calculate userID by token
        userID := rm.Token
        if wh.calcUserIDFunc != nil {
            uID, ok := wh.calcUserIDFunc(rm.Token)
            if !ok {
                return
            }
            userID = uID
        }

        // bind
        wh.binder.Bind(userID, rm.Event, conn)
    }
    conn.BeforeCloseFunc = func() {
        // unbind
        wh.binder.Unbind(conn)
    }

    conn.Listen()
}

首先將傳入的 http.Request 轉(zhuǎn)換為 websocket.Conn,再將其分裝為我們自定義的一個(gè) wserver.Conn(封裝,或者說是組合,是 Go 語言的典型用法。記住,Go 語言沒有繼承,只有組合)。

然后設(shè)置了 Conn 的 AfterReadFunc 和 BeforeCloseFunc 方法,接著啟動(dòng)了 conn.Listen()。AfterReadFunc 意思是當(dāng) Conn 讀取到數(shù)據(jù)后,嘗試驗(yàn)證并根據(jù) token 計(jì)算 userID,然乎 bind 注冊(cè)綁定。BeforeCloseFunc 則為 Conn 關(guān)閉前進(jìn)行解綁操作。

pushHandler

pushHandler 則容易理解。它解析請(qǐng)求然后推送數(shù)據(jù):

// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // authorize
    if s.authFunc != nil {
        if ok := s.authFunc(r); !ok {
            w.WriteHeader(http.StatusUnauthorized)
            return
        }
    }

    // read request
    var pm PushMessage
    decoder := json.NewDecoder(r.Body)
    if err := decoder.Decode(&pm); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(ErrRequestIllegal.Error()))
        return
    }

    // validate the data
    if pm.UserID == "" || pm.Event == "" || pm.Message == "" {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(ErrRequestIllegal.Error()))
        return
    }

    cnt, err := s.push(pm.UserID, pm.Event, pm.Message)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        w.Write([]byte(err.Error()))
        return
    }

    result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))
    io.Copy(w, result)
}
Conn
Conn (此處指 wserver.Conn) 為 websocket.Conn 的包裝。

// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {
    Conn *websocket.Conn

    AfterReadFunc   func(messageType int, r io.Reader)
    BeforeCloseFunc func()

    once   sync.Once
    id     string
    stopCh chan struct{}
}

最主要的方法為 Listen():

// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {
    c.Conn.SetCloseHandler(func(code int, text string) error {
        if c.BeforeCloseFunc != nil {
            c.BeforeCloseFunc()
        }

        if err := c.Close(); err != nil {
            log.Println(err)
        }

        message := websocket.FormatCloseMessage(code, "")
        c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
        return nil
    })

    // Keeps reading from Conn util get error.
ReadLoop:
    for {
        select {
        case <-c.stopCh:
            break ReadLoop
        default:
            messageType, r, err := c.Conn.NextReader()
            if err != nil {
                // TODO: handle read error maybe
                break ReadLoop
            }

            if c.AfterReadFunc != nil {
                c.AfterReadFunc(messageType, r)
            }
        }
    }
}

主要設(shè)置了當(dāng) websocket 連接關(guān)閉時(shí)的處理和不停地讀取數(shù)據(jù)。

關(guān)于golang實(shí)現(xiàn)WebSocket推送服務(wù)的方法就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI