溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用p-limit限制并發(fā)數(shù)

發(fā)布時(shí)間:2022-12-27 16:51:13 來源:億速云 閱讀:147 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“如何使用p-limit限制并發(fā)數(shù)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何使用p-limit限制并發(fā)數(shù)”吧!

使用

下面是官方提供的使用示例:

import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
	limit(() => fetchSomething('foo')),
	limit(() => fetchSomething('bar')),
	limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);

在代碼的第一行,使用了 pLimit(1) 來創(chuàng)建一個(gè) p-limit 實(shí)例,并將并發(fā)限制設(shè)為 1。這意味著,在任意時(shí)刻,只能有一個(gè) Promise 在運(yùn)行。

在第四行,使用了 limit(() => fetchSomething('foo')) 來包裝一個(gè)異步函數(shù),并返回一個(gè) Promise。同樣的方式,在第五、六行也包裝了其他兩個(gè)異步函數(shù)。

最后,使用 Promise.all 方法來等待所有 Promise 的完成,并在完成后將結(jié)果輸出到控制臺(tái)。由于 p-limit 的限制,這些 Promise 只會(huì)按順序一個(gè)一個(gè)地運(yùn)行,保證了并發(fā)的數(shù)量不會(huì)超過 1。

源碼分析

import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
	if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
		throw new TypeError('Expected `concurrency` to be a number from 1 and up');
	}
	const queue = new Queue();
	let activeCount = 0;
}

yocto-queue 是一種允許高效存儲(chǔ)和檢索數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)。前邊的章節(jié)分析過它的源碼,詳情參見: 源碼共讀|yocto-queue 隊(duì)列 鏈表

pLimit 函數(shù)接受一個(gè)參數(shù),并發(fā)數(shù),首先函數(shù)判斷參數(shù)是否是數(shù)組類型,或者是否能夠轉(zhuǎn)換成數(shù)字類型,如果不能,拋出一個(gè)錯(cuò)誤。

之后定義了一個(gè)隊(duì)列來存儲(chǔ)待執(zhí)行的函數(shù),并使用一個(gè)計(jì)數(shù)器來記錄當(dāng)前正在運(yùn)行的函數(shù)的數(shù)量。

const next = () => {
		activeCount--;
		if (queue.size > 0) {
			queue.dequeue()();
		}
	};
	const run = async (fn, resolve, args) => {
		activeCount++;
		const result = (async () => fn(...args))();
		resolve(result);
		try {
			await result;
		} catch {}
		next();
	};

在代碼的 next 函數(shù)中,如果隊(duì)列不為空,則從隊(duì)列中取出一個(gè)函數(shù)并執(zhí)行。這個(gè)函數(shù)的執(zhí)行會(huì)導(dǎo)致計(jì)數(shù)器的值減 1。

在代碼的 run 函數(shù)中,使用了 async/await 語法來執(zhí)行傳入的函數(shù) fn。它還使用了 resolve 函數(shù)將函數(shù)的返回值包裝成一個(gè) Promise,并將這個(gè) Promise 返回給調(diào)用者。在函數(shù)執(zhí)行完成后,調(diào)用 next 函數(shù)來執(zhí)行下一個(gè)函數(shù)。

	const enqueue = (fn, resolve, args) => {
		queue.enqueue(run.bind(undefined, fn, resolve, args));
		(async () => {
			// This function needs to wait until the next microtask before comparing
			// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
			// when the run function is dequeued and called. The comparison in the if-statement
			// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
			await Promise.resolve();
			if (activeCount < concurrency && queue.size > 0) {
				queue.dequeue()();
			}
		})();
	};

在代碼的 enqueue 函數(shù)中,使用了 queue.enqueue 方法將傳入的函數(shù) fn 加入隊(duì)列。然后,它使用了 async/await 語法來在下一個(gè)微任務(wù)中檢查當(dāng)前的并發(fā)數(shù)量是否小于設(shè)定的并發(fā)限制。如果是,則從隊(duì)列中取出一個(gè)函數(shù)并執(zhí)行。

	const generator = (fn, ...args) => new Promise(resolve => {
		enqueue(fn, resolve, args);
	});
	Object.defineProperties(generator, {
		activeCount: {
			get: () => activeCount,
		},
		pendingCount: {
			get: () => queue.size,
		},
		clearQueue: {
			value: () => {
				queue.clear();
			},
		},
	});
	return generator;

在代碼的 generator 函數(shù)中,使用了 new Promise 語法來生成一個(gè)新的 Promise,并在其中調(diào)用了 enqueue 函數(shù)。這樣,每次調(diào)用生成的函數(shù)時(shí),都會(huì)生成一個(gè)新的 Promise,并將函數(shù)加入隊(duì)列。

最后,使用了 Object.defineProperties 方法來為生成的函數(shù)添加屬性。這些屬性可以用來查詢當(dāng)前的并發(fā)數(shù)量和等待隊(duì)列的大小,以及清空等待隊(duì)列。

感謝各位的閱讀,以上就是“如何使用p-limit限制并發(fā)數(shù)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)如何使用p-limit限制并發(fā)數(shù)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI