您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關JavaScript中怎么實現(xiàn)并發(fā)控制的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
在日常開發(fā)過程中,你可能會遇到并發(fā)控制的場景,比如控制請求并發(fā)數(shù)。那么在 JavaScript 中如何實現(xiàn)并發(fā)控制呢?在回答這個問題之前,我們來簡單介紹一下并發(fā)控制。
假設有 6 個待辦任務要執(zhí)行,而我們希望限制同時執(zhí)行的任務個數(shù),即最多只有 2 個任務能同時執(zhí)行。當 正在執(zhí)行任務列表 中的任何 1 個任務完成后,程序會自動從 待辦任務列表 中獲取新的待辦任務并把該任務添加到 正在執(zhí)行任務列表 中。為了讓大家能夠更直觀地理解上述的過程,阿寶哥特意畫了以下 3 張圖:
好的,介紹完并發(fā)控制之后,阿寶哥將以 Github 上 async-pool 這個庫來介紹一下異步任務并發(fā)控制的具體實現(xiàn)。
async-pool:https://github.com/rxaviers/async-pool
Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。
async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現(xiàn),在分析其具體實現(xiàn)之前,我們來看一下它如何使用。
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
在以上代碼中,我們使用 async-pool 這個庫提供的 asyncPool
函數(shù)來實現(xiàn)異步任務的并發(fā)控制。 asyncPool
函數(shù)的簽名如下所示:
function asyncPool(poolLimit, array, iteratorFn){ ... }
該函數(shù)接收 3 個參數(shù):
poolLimit
(數(shù)字類型):表示限制的并發(fā)數(shù);
array
(數(shù)組類型):表示任務數(shù)組;
iteratorFn
(函數(shù)類型):表示迭代函數(shù),用于實現(xiàn)對每個任務項進行處理,該函數(shù)會返回一個 Promise 對象或異步函數(shù)。
對于以上示例來說,在使用了 asyncPool
函數(shù)之后,對應的執(zhí)行過程如下所示:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
通過觀察以上的注釋信息,我們可以大致地了解 asyncPool
函數(shù)內部的控制流程。下面我們先來分析 asyncPool
函數(shù)的 ES7 實現(xiàn)。
關注「全棧修仙之路」閱讀阿寶哥原創(chuàng)的 4 本免費電子書(累計下載 3萬+)及 50 幾篇 TS 系列教程。
async function asyncPool(poolLimit, array, iteratorFn) { const ret = []; // 存儲所有的異步任務 const executing = []; // 存儲正在執(zhí)行的異步任務 for (const item of array) { // 調用iteratorFn函數(shù)創(chuàng)建異步任務 const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); // 保存新的異步任務 // 當poolLimit值小于或等于總任務個數(shù)時,進行并發(fā)控制 if (poolLimit <= array.length) { // 當任務完成后,從正在執(zhí)行的任務數(shù)組中移除已完成的任務 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); // 保存正在執(zhí)行的異步任務 if (executing.length >= poolLimit) { await Promise.race(executing); // 等待較快的任務執(zhí)行完成 } } } return Promise.all(ret); }
在以上代碼中,充分利用了 Promise.all
和 Promise.race
函數(shù)特點,再結合 ES7 中提供的 async await
特性,最終實現(xiàn)了并發(fā)控制的功能。利用 await Promise.race(executing);
這行語句,我們會等待 正在執(zhí)行任務列表 中較快的任務執(zhí)行完成之后,才會繼續(xù)執(zhí)行下一次循環(huán)。
asyncPool ES7 實現(xiàn)相對比較簡單,接下來我們來看一下不使用 async await
特性要如何實現(xiàn)同樣的功能。
function asyncPool(poolLimit, array, iteratorFn) { let i = 0; const ret = []; // 存儲所有的異步任務 const executing = []; // 存儲正在執(zhí)行的異步任務 const enqueue = function () { if (i === array.length) { return Promise.resolve(); } const item = array[i++]; // 獲取新的任務項 const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); let r = Promise.resolve(); // 當poolLimit值小于或等于總任務個數(shù)時,進行并發(fā)控制 if (poolLimit <= array.length) { // 當任務完成后,從正在執(zhí)行的任務數(shù)組中移除已完成的任務 const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { r = Promise.race(executing); } } // 正在執(zhí)行任務列表 中較快的任務執(zhí)行完成之后,才會從array數(shù)組中獲取新的待辦任務 return r.then(() => enqueue()); }; return enqueue().then(() => Promise.all(ret)); }
在 ES6 的實現(xiàn)版本中,通過內部封裝的 enqueue
函數(shù)來實現(xiàn)核心的控制邏輯。當 Promise.race(executing)
返回的 Promise
對象變成已完成狀態(tài)時,才會調用 enqueue
函數(shù),從 array
數(shù)組中獲取新的待辦任務。
在 asyncPool
這個庫的 ES7 和 ES6 的具體實現(xiàn)中,我們都使用到了 Promise.all
和 Promise.race
函數(shù)。其中手寫 Promise.all
是一道常見的面試題。剛好趁著這個機會,阿寶哥跟大家一起來手寫簡易版的 Promise.all
和 Promise.race
函數(shù)。
Promise.all(iterable)
方法會返回一個 promise 對象,當輸入的所有 promise 對象的狀態(tài)都變成 resolved
時,返回的 promise 對象就會以數(shù)組的形式,返回每個 promise 對象 resolve 后的結果。當輸入的任何一個 promise 對象狀態(tài)變成 rejected
時,則返回的 promise 對象會 reject 對應的錯誤信息。
Promise.all = function (iterators) { return new Promise((resolve, reject) => { if (!iterators || iterators.length === 0) { resolve([]); } else { let count = 0; // 計數(shù)器,用于判斷所有任務是否執(zhí)行完成 let result = []; // 結果數(shù)組 for (let i = 0; i < iterators.length; i++) { // 考慮到iterators[i]可能是普通對象,則統(tǒng)一包裝為Promise對象 Promise.resolve(iterators[i]).then( (data) => { result[i] = data; // 按順序保存對應的結果 // 當所有任務都執(zhí)行完成后,再統(tǒng)一返回結果 if (++count === iterators.length) { resolve(result); } }, (err) => { reject(err); // 任何一個Promise對象執(zhí)行失敗,則調用reject()方法 return; } ); } } }); };
需要注意的是對于 Promise.all
的標準實現(xiàn)來說,它的參數(shù)是一個可迭代對象,比如 Array、String 或 Set 等。
Promise.race(iterable)
方法會返回一個 promise 對象,一旦迭代器中的某個 promise 對象 resolved 或 rejected,返回的 promise 對象就會 resolve 或 reject 相應的值。
Promise.race = function (iterators) { return new Promise((resolve, reject) => { for (const iter of iterators) { Promise.resolve(iter) .then((res) => { resolve(res); }) .catch((e) => { reject(e); }); } }); };
本文阿寶哥帶大家詳細分析了 async-pool 異步任務并發(fā)控制的具體實現(xiàn),同時為了讓大家能夠更好地理解 async-pool 的核心代碼。最后阿寶哥還帶大家一起手寫簡易版的 Promise.all
和 Promise.race
函數(shù)。其實除了 Promise.all
函數(shù)之外,還存在另一個函數(shù) —— Promise.allSettled
,該函數(shù)用于解決 Promise.all
存在的問題,感興趣的小伙伴可以自行研究一下。
感謝各位的閱讀!關于“JavaScript中怎么實現(xiàn)并發(fā)控制”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。