您好,登錄后才能下訂單哦!
這篇文章主要介紹了Redis+Node.js如何實現(xiàn)一個能處理海量數(shù)據(jù)的異步任務隊列系統(tǒng),具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
在最近的業(yè)務中,接到了一個需要處理約十萬條數(shù)據(jù)的需求。這些數(shù)據(jù)都以字符串的形式給到,并且處理它們的步驟是異步且耗時的(平均處理一條數(shù)據(jù)需要 25s 的時間)。如果以串行的方式實現(xiàn),其耗時是相當長的:
總耗時時間 = 數(shù)據(jù)量 × 單條數(shù)據(jù)處理時間
T = N * t (N = 100,000; t = 25s)
總耗時時間 = 2,500,000 秒 ≈ 695 小時 ≈ 29 天
顯然,我們不能簡單地把數(shù)據(jù)一條一條地處理。那么有沒有辦法能夠減少處理的時間呢?經(jīng)過調(diào)研后發(fā)現(xiàn),使用異步任務隊列是個不錯的辦法。
一、異步任務隊列原理
我們可以把“處理單條數(shù)據(jù)”理解為一個異步任務,因此對這十萬條數(shù)據(jù)的處理,就可以轉化成有十萬個異步任務等待進行。我們可以把這十萬條數(shù)據(jù)塞到一個隊列里面,讓任務處理器自發(fā)地從隊列里面去取得并完成。
任務處理器可以有多個,它們同時從隊列里面把任務取走并處理。當任務隊列為空,表示所有任務已經(jīng)被認領完;當所有任務處理器完成任務,則表示所有任務已經(jīng)被處理完。
其基本原理如下圖所示:
首先來解決任務隊列的問題。在這個需求中,任務隊列里面的每一個任務,都包含了待處理的數(shù)據(jù),數(shù)據(jù)以字符串的形式存在。為了方便起見,我們可以使用 Redis 的 List 數(shù)據(jù)格式來存放這些任務。
由于項目是基于 NodeJS 的,我們可以利用 PM2 的 Cluster 模式來啟動多個任務處理器,并行地處理任務。以一個 8 核的 CPU 為例,如果完全開啟了多進程,其理論處理時間將提升 8 倍,從 29 天縮短到 3.6 天。
接下來,我們會從實際編碼的角度來講解上述內(nèi)容的實現(xiàn)過程。
二、使用 NodeJS 操作 Redis
異步任務隊列使用 Redis 來實現(xiàn),因此我們需要部署一個單獨的 Redis 服務。在本地開發(fā)中為了快速完成 Redis 的安裝,我使用了 Docker 的辦法(默認機器已經(jīng)安裝了 Docker)。
Docker 拉取 Redis 鏡像
docker pull redis:latest
Docker 啟動 Redis
docker run -itd --name redis-local-p 6379:6379 redis
此時我們已經(jīng)使用 Docker 啟動了一個 Redis 服務,其對外的 IP 及端口為 127.0.0.1:6379。此外,我們還可以在本地安裝一個名為 Another Redis DeskTop Manager的 Redis 可視化工具,來實時查看、修改 Redis 的內(nèi)容。
在 NodeJS 中,我們可以使用 node-redis 來操作 Redis。新建一個 mqclient.ts 文件并寫入如下內(nèi)容:
import* asRedisfrom'redis' const client = Redis.createClient({ host: '127.0.0.1', port: 6379 }) exportdefault client
Redis 本質上是一個數(shù)據(jù)庫,而我們對數(shù)據(jù)庫的操作無非就是增刪改查。node-redis 支持 Redis 的所有交互操作方式,但是操作結果默認是以回調(diào)函數(shù)的形式返回。為了能夠使用 async/await,我們可以新建一個 utils.ts 文件,把 node-redis 操作 Redis 的各種操作都封裝成 Promise 的形式,方便我們后續(xù)使用。
import client from'./mqClient' // 獲取 Redis 中某個 key 的內(nèi)容 exportconst getRedisValue = (key: string): Promise<string| null> => newPromise(resolve => client.get(key, (err, reply) => resolve(reply))) // 設置 Redis 中某個 key 的內(nèi)容 exportconst setRedisValue = (key: string, value: string) => newPromise(resolve => client.set(key, value, resolve)) // 刪除 Redis 中某個 key 及其內(nèi)容 exportconst delRedisKey = (key: string) => newPromise(resolve => client.del(key, resolve))
除此之外,還能在 utils.ts 中放置其他常用的工具方法,以實現(xiàn)代碼的復用、保證代碼的整潔。
為了在 Redis 中創(chuàng)建任務隊列,我們可以單獨寫一個 createTasks.ts 的腳本,用于往隊列中塞入自定義的任務。
import{ TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from'./utils' import client from'./mqClient' client.on('ready', async() => { await delRedisKey(TASK_NAME) for(let i = TASK_AMOUNT; i > 0; i--) { client.lpush(TASK_NAME, `task-${i}`) } client.lrange(TASK_NAME, 0, TASK_AMOUNT, async(err, reply) => { if(err) { console.error(err) return } console.log(reply) process.exit() }) })
在這段腳本中,我們從 utils.ts 中獲取了各個 Redis 操作的方法,以及任務的名稱 TASKNAME (此處為 localtasks)和任務的總數(shù) TASKAMOUNT(此處為 20 個)。通過 LPUSH 方法往 TASKNAME 的 List 當中塞入內(nèi)容為 task-1 到 task-20 的任務,如圖所示:
三、異步任務處理
首先新建一個 index.ts 文件,作為整個異步任務隊列處理系統(tǒng)的入口文件。
import taskHandler from'./tasksHandler' import client from'./mqClient' client.on('connect', () => { console.log('Redis is connected!') }) client.on('ready', async() => { console.log('Redis is ready!') await taskHandler() }) client.on('error', (e) => { console.log('Redis error! '+ e) })
在運行該文件時,會自動連接 Redis,并且在 ready 狀態(tài)時執(zhí)行任務處理器 taskHandler()。
在上一節(jié)的操作中,我們往任務隊列里面添加了 20 個任務,每個任務都是形如 task-n 的字符串。為了驗證異步任務的實現(xiàn),我們可以在任務處理器 taskHandler.ts 中寫一段 demo 函數(shù),來模擬真正的異步任務:
function handleTask(task: string) { returnnewPromise((resolve) => { setTimeout(async() => { console.log(`Handling task: ${task}...`) resolve() }, 2000) }) }
上面這個 handleTask() 函數(shù),將會在執(zhí)行的 2 秒后打印出當前任務的內(nèi)容,并返回一個 Promise,很好地模擬了異步函數(shù)的實現(xiàn)方式。接下來我們將會圍繞這個函數(shù),來處理隊列中的任務。
其實到了這一步為止,整個異步任務隊列處理系統(tǒng)已經(jīng)基本完成了,只需要在 taskHandler.ts 中補充一點點代碼即可:
import{ popTask } from'./utils' import client from'./mqClient' function handleTask(task: string) { /* ... */} exportdefaultasyncfunction tasksHandler() { // 從隊列中取出一個任務 const task = await popTask() // 處理任務 await handleTask(task) // 遞歸運行 await tasksHandler() }
最后,我們使用 PM2 啟動 4 個進程,來試著跑一下整個項目:
pm2 start ./dist/index.js -i 4&& pm2 logs
可以看到,4 個任務處理器分別處理完了隊列中的所有任務,相互之前互不影響。
事到如今已經(jīng)大功告成了嗎?未必。為了測試我們的這套系統(tǒng)到底提升了多少的效率,還需要統(tǒng)計完成隊列里面所有任務的總耗時。
四、統(tǒng)計任務完成耗時
要統(tǒng)計任務完成的耗時,只需要實現(xiàn)下列的公式即可:
總耗時 = 最后一個任務的完成時間 - 首個任務被取得的時間
首先來解決“獲取首個任務被取得的時間”這個問題。
由于我們是通過 PM2 的 Cluster 模式來啟動應用的,且從 Redis 隊列中讀取任務是個異步操作,因此在多進程運行的情況下無法直接保證從隊列中讀取任務的先后順序,必須通過一個額外的標記來判斷。其原理如下圖:
如圖所示,綠色的 worker 由于無法保證運行的先后順序,所以編號用問號來表示。當?shù)谝粋€任務被取得時,把黃色的標記值從 false 設置成 true。當且僅當黃色的標記值為 false 時才會設置時間。這樣一來,當其他任務被取得時,由于黃色的標記值已經(jīng)是 true 了,因此無法設置時間,所以我們便能得到首個任務被取得的時間。
在本文的例子中,黃色的標記值和首個任務被取得的時間也被存放在 Redis 中,分別被命名為 localtasksSETFIRST 和 localtasksBEGINTIME。
原理已經(jīng)弄懂,但是在實踐中還有一個地方值得注意。我們知道,從 Redis 中讀寫數(shù)據(jù)也是一個異步操作。由于我們有多個 worker 但只有一個 Redis,那么在讀取黃色標記值的時候很可能會出現(xiàn)“沖突”的問題。舉個例子,當 worker-1 修改標記值為 true 的同時, worker-2 正好在讀取標記值。由于時間的關系,可能 worker-2 讀到的標記值依然是 false,那么這就沖突了。為了解決這個問題,我們可以使用 node-redlock 這個工具來實現(xiàn)“鎖”的操作。
顧名思義,“鎖”的操作可以理解為當 worker-1 讀取并修改標記值的時候,不允許其他 worker 讀取該值,也就是把標記值給鎖住了。當 worker-1 完成標記值的修改時會釋放鎖,此時才允許其他的 worker 去讀取該標記值。
node-redlock 是 Redis 分布式鎖 Redlock 算法的 JavaScript 實現(xiàn),關于該算法的講解可參考 https://redis.io/topics/distlock。值得注意的是,在 node-redlock 在使用的過程中,如果要鎖一個已存在的 key,就必須為該 key 添加一個前綴 locks:,否則會報錯。
回到 utils.ts,編寫一個 setBeginTime() 的工具函數(shù):
exportconst setBeginTime = async(redlock: Redlock) => { // 讀取標記值前先把它鎖住 constlock= await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000) const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`) // 當且僅當標記值不等于 true 時,才設置起始時間 if(setFirst !== 'true') { console.log(`${pm2tips} Get the first task!`) await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true') await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`) } // 完成標記值的讀寫操作后,釋放鎖 awaitlock.unlock().catch(e => e) }
然后把它添加到 taskHandler() 函數(shù)里面即可:
exportdefaultasyncfunction tasksHandler() { + // 獲取第一個任務被取得的時間 + await setBeginTime(redlock) // 從隊列中取出一個任務 const task = await popTask() // 處理任務 await handleTask(task) // 遞歸運行 await tasksHandler() }
接下來解決“最后一個任務的完成時間”這個問題。
類似上一個問題,由于任務執(zhí)行的先后順序無法保證,異步操作的完成時間也無法保證,因此我們也需要一個額外的標識來記錄任務的完成情況。在 Redis 中創(chuàng)建一個初始值為 0 的標識 localtasksCURINDEX,當 worker 完成一個任務就讓標識加。由于任務隊列的初始長度是已知的(為 TASKAMOUNT 常量,也寫入了 Redis 的 localtasksTOTAL 中),因此當標識的值等于隊列初始長度的值時,即可表明所有任務都已經(jīng)完成。
如圖所示,被完成的任務都會讓黃色的標識加一,任何時候只要判斷到標識的值等于隊列的初始長度值,即可表明任務已經(jīng)全部完成。
回到 taskHandler() 函數(shù),加入下列內(nèi)容:
exportdefaultasyncfunction tasksHandler() { + // 獲取標識值和隊列初始長度 + let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`)) + const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`)) + // 等待新任務 + if(taskAmount === 0) { + console.log(`${pm2tips} Wating new tasks...`) + await sleep(2000) + await tasksHandler() + return + } + // 判斷所有任務已經(jīng)完成 + if(curIndex === taskAmount) { + const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`) + // 獲取總耗時 + const cost = newDate().getTime() - Number(beginTime) + console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`) + // 初始化 Redis 的一些標識值 + await setRedisValue(`${TASK_NAME}_TOTAL`, '0') + await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0') + await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false') + await delRedisKey(`${TASK_NAME}_BEGIN_TIME`) + await sleep(2000) + await tasksHandler() } // 獲取第一個任務被取得的時間 await setBeginTime(redlock) // 從隊列中取出一個任務 const task = await popTask() // 處理任務 await handleTask(task) + // 任務完成后需要為標識位加一 + try{ + constlock= await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000) + curIndex = await getCurIndex() + await setCurIndex(curIndex + 1) + awaitlock.unlock().catch((e) => e) + } catch(e) { + console.log(e) + } + // recursion + await tasksHandler() +} // 遞歸運行 await tasksHandler() }
到這一步為止,我們已經(jīng)解決了獲取“最后一個任務的完成時間”的問題,再結合前面的首個任務被取得的時間,便能得出運行的總耗時。
最后來看一下實際的運行效果。我們循例往隊列里面添加了 task-1 到 task-20 這 20 個任務,然后啟動 4 個進程來跑:
運行狀況良好。從運行結果來看,4 個進程處理 20 個平均耗時 2 秒的任務,只需要 10 秒的時間,完全符合設想。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Redis+Node.js如何實現(xiàn)一個能處理海量數(shù)據(jù)的異步任務隊列系統(tǒng)”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業(yè)資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。