溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Websocket庫Ws的原理是什么

發(fā)布時(shí)間:2021-07-06 11:17:32 來源:億速云 閱讀:152 作者:chen 欄目:web開發(fā)

本篇內(nèi)容介紹了“Websocket庫Ws的原理是什么”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

ws服務(wù)器邏輯由websocket-server.js的WebSocketServer類實(shí)現(xiàn)。該類初始化了一些參數(shù)后就執(zhí)行以下代碼

if (this._server) {       // 給server注冊下面事件,返回一個(gè)注銷函數(shù)(用于注銷下面注冊的事件)       this._removeListeners = addListeners(this._server, {         // listen成功的回調(diào)         listening: this.emit.bind(this, 'listening'),         error: this.emit.bind(this, 'error'),         // 收到協(xié)議升級請求的回調(diào)         upgrade: (req, socket, head) => {           this.handleUpgrade(req, socket, head, (ws) => {             // 處理成功,觸發(fā)鏈接成功事件             this.emit('connection', ws, req);           });         }       });

我們看到ws監(jiān)聽了upgrade事件,當(dāng)有websocket請求到來時(shí)就會執(zhí)行handleUpgrade處理升級請求,升級成功后觸發(fā)connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗(yàn)升級請求的一些http頭。ws提供了一個(gè)校驗(yàn)的鉤子。處理完http頭后,會調(diào)verifyClient校驗(yàn)是否允許升級請求。如果成功則執(zhí)行completeUpgrade。顧名思義,completeUpgrade是完成升級請求的函數(shù),該函數(shù)返回同意協(xié)議升級并且設(shè)置一些http響應(yīng)頭。另外還有一些重要的邏輯處理。

const ws = new WebSocket(null); // 設(shè)置管理socket的數(shù)據(jù) ws.setSocket(socket, head, this.options.maxPayload); // cb就是this.emit('connection', ws, req); cb(ws);

我們看到這里新建了一個(gè)WebSocket對象并且調(diào)用了他的setSocket函數(shù)。我們來看看他做了什么。setSocket的邏輯非常多,我們慢慢分析。

數(shù)據(jù)接收者

class Receiver extends Writable {}

我們看到數(shù)據(jù)接收者是一個(gè)可寫流。這就意味著我們可以往里面寫數(shù)據(jù)。

const receiver = new Receiver(); receiver.write('hello');

我們看一下這時(shí)候Receiver的邏輯。

_write(chunk, encoding, cb) {     if (this._opcode === 0x08 && this._state == GET_INFO) return cb();     this._bufferedBytes += chunk.length;     this._buffers.push(chunk);     this.startLoop(cb);   }

首先記錄當(dāng)前數(shù)據(jù)的大小,然后把數(shù)據(jù)存起來,最后執(zhí)行startLoop。

startLoop(cb) {     let err;     this._loop = true;      do {       switch (this._state) {         // 忽略其他case         case GET_DATA:           err = this.getData(cb);           break;         default:           // `INFLATING`           this._loop = false;           return;       }     } while (this._loop);      cb(err);   }

我們知道websocket是基于tcp上層的應(yīng)用層協(xié)議,所以我們收到數(shù)據(jù)時(shí),需要解析出一個(gè)個(gè)數(shù)據(jù)包(粘包問題),所以Receiver其實(shí)就是一個(gè)狀態(tài)機(jī),每次收到數(shù)據(jù)的時(shí)候,都會根據(jù)當(dāng)前的狀態(tài)進(jìn)行狀態(tài)流轉(zhuǎn)。比如當(dāng)前處于GET_DATA狀態(tài),那么就會進(jìn)行數(shù)據(jù)的處理。我們接著看一下數(shù)據(jù)處理的邏輯。

getData(cb) {     let data = EMPTY_BUFFER;     // 提取數(shù)據(jù)部分     if (this._payloadLength) {       data = this.consume(this._payloadLength);       if (this._masked) unmask(data, this._mask);     }     // 是控制報(bào)文則執(zhí)行controlMessage     if (this._opcode > 0x07) return this.controlMessage(data);     // 做了壓縮,則先解壓     if (this._compressed) {       this._state = INFLATING;       this.decompress(data, cb);       return;     }     // 沒有壓縮則直接處理(先存到_fragments,然后執(zhí)行dataMessage)     if (data.length) {       this._messageLength = this._totalPayloadLength;       this._fragments.push(data);     }      return this.dataMessage();   }

我們執(zhí)行websocket協(xié)議定義了報(bào)文的類型,比如控制報(bào)文,數(shù)據(jù)報(bào)文。我們分別看一下這兩個(gè)的邏輯。

controlMessage(data) {     // 連接關(guān)閉     if (this._opcode === 0x08) {       this._loop = false;       if (data.length === 0) {         this.emit('conclude', 1005, '');         this.end();       }     } else if (this._opcode === 0x09) {       this.emit('ping', data);     } else {       this.emit('pong', data);     }     this._state = GET_INFO;   }

我們看到控制報(bào)文包括三種(conclude、ping、pong)。而數(shù)據(jù)報(bào)文只有this.emit('message',  data);一種。這個(gè)就是接收者的整體邏輯。

2 數(shù)據(jù)發(fā)送者

數(shù)據(jù)發(fā)送者是對websocket協(xié)議的封裝,當(dāng)用戶調(diào)研數(shù)據(jù)發(fā)送者的send接口發(fā)送數(shù)據(jù)時(shí),數(shù)據(jù)發(fā)送者會組裝成一個(gè)websocket協(xié)議的包再發(fā)送出去。

send(data, options, cb) {     const buf = toBuffer(data);     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];     let opcode = options.binary ? 2 : 1;     let rsv1 = options.compress;      if (this._firstFragment) {       this._firstFragment = false;       if (rsv1 && perMessageDeflate) {         rsv1 = buf.length >= perMessageDeflate._threshold;       }       this._compress = rsv1;     } else {       rsv1 = false;       opcode = 0;     }      if (options.fin) this._firstFragment = true;     // 需要壓縮     if (perMessageDeflate) {       const opts = {         fin: options.fin,         rsv1,         opcode,         mask: options.mask,         readOnly: toBuffer.readOnly       };       // 正在壓縮,則排隊(duì)等待,否則執(zhí)行壓縮       if (this._deflating) {         this.enqueue([this.dispatch, buf, this._compress, opts, cb]);       } else {         this.dispatch(buf, this._compress, opts, cb);       }     } else {       // 不需要壓縮,直接發(fā)送       this.sendFrame(         Sender.frame(buf, {           fin: options.fin,           rsv1: false,           opcode,           mask: options.mask,           readOnly: toBuffer.readOnly         }),         cb       );     }   }

send函數(shù)做了一些參數(shù)的處理后發(fā)送數(shù)據(jù),但是如果需要壓縮的話,要壓縮后才能發(fā)送。數(shù)據(jù)處理完成后調(diào)用真正的發(fā)送函數(shù)

sendFrame(list, cb) {     if (list.length === 2) {       this._socket.cork();       this._socket.write(list[0]);       this._socket.write(list[1], cb);       this._socket.uncork();     } else {       this._socket.write(list[0], cb);     }   }

了解了數(shù)據(jù)接收者和發(fā)送者的邏輯后,我們看一下websocket對象和setSocket函數(shù)做了什么事情,websocket對象本質(zhì)是對TCP  socket的封裝。它接收來自底層的數(shù)據(jù),然后透傳給數(shù)據(jù)接收者,數(shù)據(jù)接收者處理完后,觸發(fā)websocket對應(yīng)的對應(yīng)的事件,比如message事件。發(fā)送數(shù)據(jù)的時(shí)候,websocket會調(diào)用數(shù)據(jù)發(fā)送者的接口,數(shù)據(jù)發(fā)送者組裝成websocket協(xié)議的數(shù)據(jù)包后再發(fā)送出去,架構(gòu)如下圖所示。

Websocket庫Ws的原理是什么

接下來我們看看setSocket的邏輯

setSocket(socket, head, maxPayload) {     // 數(shù)據(jù)接收者,負(fù)責(zé)處理tcp上收到的數(shù)據(jù)(socket是tcp層的socket)     const receiver = new Receiver(...);     // 數(shù)據(jù)發(fā)送者,負(fù)責(zé)發(fā)送數(shù)據(jù)給對端     this._sender = new Sender(socket, this._extensions);     // 數(shù)據(jù)接收者,負(fù)責(zé)解析數(shù)據(jù)     this._receiver = receiver;     // net模塊的tcp socket     this._socket = socket;     // 關(guān)聯(lián)起來     receiver[kWebSocket] = this;     socket[kWebSocket] = this;     // 監(jiān)聽接收者的事件,解析數(shù)據(jù)的時(shí)候會回調(diào)     receiver.on('conclude', receiverOnConclude);     // 下面兩個(gè)事件由Writable觸發(fā)     receiver.on('drain', receiverOnDrain);     receiver.on('error', receiverOnError);     receiver.on('message', receiverOnMessage);     receiver.on('ping', receiverOnPing);     receiver.on('pong', receiverOnPong);     // 清除定時(shí)器     socket.setTimeout(0);     // 關(guān)閉nagle算法     socket.setNoDelay();     // 升級請求中,攜帶的http body,通常是空     if (head.length > 0) socket.unshift(head);     // 監(jiān)聽tcp底層的事件     socket.on('close', socketOnClose);     socket.on('data', socketOnData);     socket.on('end', socketOnEnd);     socket.on('error', socketOnError);      this.readyState = WebSocket.OPEN;     this.emit('open');   }

我們看到里面監(jiān)聽了各種事件,下面以data事件為例,看一下處理過程。當(dāng)tcp socket收到數(shù)據(jù)的時(shí)候會執(zhí)行socketOnData函數(shù)。

function socketOnData(chunk) {   // 會調(diào)用receiver里的_write函數(shù),其實(shí)就是換成到receiver對象上,如果數(shù)據(jù)解析出錯,會觸發(fā)socket error事件   if (!this[kWebSocket]._receiver.write(chunk)) {     this.pause();   } }

socketOnData通過接收者的接口把數(shù)據(jù)傳給接收者,接收者會解析數(shù)據(jù),然后觸發(fā)對應(yīng)的事件,比如message。

receiver.on('message', receiverOnMessage); function receiverOnMessage(data) {   this[kWebSocket].emit('message', data); }

然后ws的socket對象繼續(xù)往上層觸發(fā)message事件。this[kWebSocket]的值是ws提供的socket對象本身。架構(gòu)圖如下。

Websocket庫Ws的原理是什么

這就是ws實(shí)現(xiàn)websocket協(xié)議的基本原理,具體細(xì)節(jié)可以參考源碼。

“Websocket庫Ws的原理是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI