溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

HDFS2.X中NameNode塊報(bào)告處理的示例分析

發(fā)布時(shí)間:2021-12-13 10:58:30 來源:億速云 閱讀:150 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要介紹了HDFS2.X中NameNode塊報(bào)告處理的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

NameNode會(huì)接收兩種情況的塊報(bào)告,DataNode全部塊報(bào)告與增量塊報(bào)告。

4.1全量報(bào)告分析

       目前全量報(bào)告以周期性進(jìn)行報(bào)告,既然已經(jīng)有啟動(dòng)時(shí)候的全量數(shù)據(jù)塊報(bào)告,錯(cuò)誤塊報(bào)告,增量塊報(bào)告(包括刪除塊報(bào)告),為什么還需要周期性全量塊報(bào)告呢?比如某DataNode接受到數(shù)據(jù)塊但是增量報(bào)告失敗,那就需要周期性報(bào)告來解決了,或者NameNode給DN發(fā)送了刪除塊的命令,但是由于網(wǎng)絡(luò)等異常,DN沒收收到刪除命令,這樣DN再把這些數(shù)據(jù)塊報(bào)告上來就是無效塊,需要再次放入無效隊(duì)列,下次心跳再命令DN刪除;同時(shí)比如每次塊報(bào)告會(huì)清理DatanodeDescriptor對(duì)象維護(hù)的塊列表還有某個(gè)塊的信息,但是DN節(jié)點(diǎn)再也沒有報(bào)告上來,定時(shí)清除這些無效信息,有助于提高塊列表的操作性能,從而提供NameNode的性能。同時(shí)我們可以考慮分析是否還有其他原因可能影響NameNode的性能。

為了提高HDFS啟動(dòng)速度,在Hadoop2.0版本中全量塊報(bào)告分為了兩種:啟動(dòng)時(shí)候塊報(bào)告與非啟動(dòng)的時(shí)候塊報(bào)告,即是否是第一次塊報(bào)告。那么具體又是如何來提高啟動(dòng)速度的呢?在啟動(dòng)的時(shí)候,不計(jì)算哪些文件元數(shù)據(jù)需要?jiǎng)h除,不計(jì)算無效快,這些處理都推遲到下一次塊報(bào)告進(jìn)行處理

對(duì)于第一次塊報(bào)告,代碼調(diào)用流程為:NameNodeRpcServer.blockReport()->BlockManager. processReport()->BlockManager.processFirstBlockReport().對(duì)Standby節(jié)點(diǎn),如果報(bào)告的數(shù)據(jù)塊所相關(guān)元數(shù)據(jù)日志從節(jié)點(diǎn)還沒有加載完畢,則會(huì)將報(bào)告的塊信息加入一個(gè)隊(duì)列,當(dāng)Standby節(jié)點(diǎn)加載元數(shù)據(jù)后,再處理該消息隊(duì)列,第一次塊報(bào)告處理詳細(xì)代碼如下,可以看到,為了提高報(bào)告速度,只有簡單的幾步進(jìn)行塊報(bào)告處理,僅有驗(yàn)證塊是否損壞,然后直接判斷塊狀態(tài)是否為FINALIZED狀態(tài),如果是,就直接建立塊與DN節(jié)點(diǎn)的映射。

[java] view plain copy

  1. private void processFirstBlockReport(final DatanodeDescriptor node,  

  2.       final BlockListAsLongs report) throws IOException {  

  3.     if (report == null) return;  

  4.     assert (namesystem.hasWriteLock());  

  5.     assert (node.numBlocks() == 0);  

  6.     BlockReportIterator itBR = report.getBlockReportIterator();  

  7.    

  8.     while(itBR.hasNext()) {  

  9.       Block iblk = itBR.next();  

  10.       ReplicaState reportedState = itBR.getCurrentReplicaState();  

  11.       //對(duì)于從節(jié)點(diǎn)shouldPostponeBlocksFromFuture為true;判斷塊時(shí)間戳//是否大于目前時(shí)間  

  12.       if (shouldPostponeBlocksFromFuture&&  

  13.           namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {  

  14. //將塊信息加入隊(duì)列,從節(jié)點(diǎn)消化完相關(guān)日志,會(huì)處理該隊(duì)列  

  15.         queueReportedBlock(node, iblk, reportedState,  

  16.             QUEUE_REASON_FUTURE_GENSTAMP);  

  17.         continue;  

  18.       }  

  19.        

  20.       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);  

  21.       // If block does not belong to any file, we are done.  

  22.       if (storedBlock == null) continue;  

  23.        

  24.       // If block is corrupt, mark it and continue to next block.  

  25.       BlockUCState ucState = storedBlock.getBlockUCState();  

  26.       BlockToMarkCorrupt c = checkReplicaCorrupt(  

  27.           iblk, reportedState, storedBlock, ucState, node);  

  28.       if (c != null) {  

  29. //對(duì)于從節(jié)點(diǎn),先將塊信息加入pendingDNMessages隊(duì)列  

  30. //將塊信息加入隊(duì)列,從節(jié)點(diǎn)消化完相關(guān)日志,會(huì)處理該隊(duì)列,如果該塊還是被損壞,就真的是損壞了  

  31.         if (shouldPostponeBlocksFromFuture) {  

  32.           // In the Standby, we may receive a block report for a file that we  

  33.           // just have an out-of-date gen-stamp or state for, for example.  

  34.           queueReportedBlock(node, iblk, reportedState,  

  35.               QUEUE_REASON_CORRUPT_STATE);  

  36.         } else {  

  37. //對(duì)于主節(jié)點(diǎn),有塊損壞,直接標(biāo)記為損壞  

  38.           markBlockAsCorrupt(c, node);  

  39.         }  

  40.         continue;  

  41.       }  

  42.        

  43.       // If block is under construction, add this replica to its list  

  44.       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {  

  45.         ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(  

  46.             node, iblk, reportedState);  

  47.         //and fall through to next clause  

  48.       }       

  49.       //add replica if appropriate  

  50.       if (reportedState == ReplicaState.FINALIZED) {  

  51.         addStoredBlockImmediate(storedBlock, node);  

  52.       }  

  53.     }  

  54.   }  

而對(duì)于非第一次塊報(bào)告,情況就要復(fù)雜一些了,對(duì)于報(bào)告的每個(gè)塊信息,不僅會(huì)建立塊與DN的映射,而且均會(huì)檢查塊是否損壞,塊是是否無效,元數(shù)據(jù)是否已經(jīng)無效應(yīng)該刪除,是否為UC狀態(tài)的塊等,該過程主要由方法processReport來完成

[java] view plain copy

  1. private void processReport(final DatanodeDescriptor node,  

  2.       final BlockListAsLongs report) throws IOException {  

  3.     // Normal case:  

  4.     // Modify the (block-->datanode) map, according to the difference  

  5.     // between the old and new block report.  

  6.     //  

  7.     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();  

  8.     Collection<Block> toRemove = new LinkedList<Block>();  

  9.     Collection<Block> toInvalidate = new LinkedList<Block>();  

  10.     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();  

  11. Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();  

  12. //統(tǒng)計(jì)塊,并且判斷塊是否應(yīng)該刪除,是否應(yīng)該添加到blocksMap列表等  

  13.     reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);  

  14.    

  15.     // Process the blocks on each queue  

  16.     for (StatefulBlockInfo b : toUC) {  

  17.       addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);  

  18.     }  

  19.     for (Block b : toRemove) {  

  20.       removeStoredBlock(b, node);  

  21.     }  

  22.     for (BlockInfo b : toAdd) {  

  23.       addStoredBlock(b, node, null, true);  

  24.     }  

  25.     for (Block b : toInvalidate) {  

  26.       NameNode.stateChangeLog.info("BLOCK* processReport: block "  

  27.           + b + " on " + node + " size " + b.getNumBytes()  

  28.           + " does not belong to any file.");  

  29.       addToInvalidates(b, node);  

  30.     }  

  31.     for (BlockToMarkCorrupt b : toCorrupt) {  

  32.       markBlockAsCorrupt(b, node);  

  33.     }  

  34.   }  

在reportDiff方法內(nèi),實(shí)現(xiàn)如下:

[java] view plain copy

  1. private void reportDiff(DatanodeDescriptor dn,  

  2.       BlockListAsLongs newReport,  

  3.       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor  

  4.       Collection<Block> toRemove,           // remove from DatanodeDescriptor  

  5.       Collection<Block> toInvalidate,       // should be removed from DN  

  6.       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list  

  7.       Collection<StatefulBlockInfo> toUC) { // add to under-construction list  

  8.     // place a delimiter分隔符 in the list which separates blocks  

  9.     // that have been reported from those that have not  

  10.     BlockInfo delimiter = new BlockInfo(new Block(), 1);  

  11.     boolean added = dn.addBlock(delimiter);  

  12.     assert added : "Delimiting block cannot be present in the node";  

  13.     int headIndex = 0; //currently the delimiter is in the head of the list  

  14.     int curIndex;  

  15.    

  16.     if (newReport == null)  

  17.       newReport = new BlockListAsLongs();  

  18.     // scan the report and process newly reported blocks  

  19.     BlockReportIterator itBR = newReport.getBlockReportIterator();  

  20.     while(itBR.hasNext()) {  

  21.       Block iblk = itBR.next();  

  22.       ReplicaState iState = itBR.getCurrentReplicaState();  

  23.       BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,  

  24.                                   toAdd, toInvalidate, toCorrupt, toUC);  

  25.       // move block to the head of the list  

  26.       if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {  

  27.         headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);  

  28.       }  

  29.     }  

  30.     // collect blocks that have not been reported  

  31.     // all of them are next to the delimiter  

  32.     //收集DN對(duì)象中所有沒有被DN節(jié)點(diǎn)報(bào)告上來的塊,將這些塊信息從DN對(duì)象維護(hù)的列表中刪除,這樣可以有效控制DN塊列表中存在大量的無效塊,  

  33.     //影響NameNode的操作性能  

  34.     Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(  

  35.         delimiter.getNext(0), dn);  

  36.     while(it.hasNext())  

  37.       toRemove.add(it.next());  

  38.     dn.removeBlock(delimiter);  

  39.   }  

4.2增量報(bào)告分析

     相比于全量塊報(bào)告方式,增量報(bào)告報(bào)告DN節(jié)點(diǎn)很短時(shí)間內(nèi)已經(jīng)接收完成,或者正在接受或者刪除的塊,而且為了提高文件上傳的效率, DN節(jié)點(diǎn)應(yīng)該盡快將接受到的塊報(bào)告給NameNode,現(xiàn)在引入了RECEIVING_BLOCK這個(gè)一個(gè)塊狀態(tài),有可能就是為了提高寫入速度。

HDFS2.X中NameNode塊報(bào)告處理的示例分析

                                                                     增量塊報(bào)告流程圖

正在接收的塊與已經(jīng)接收完的塊,除了在數(shù)據(jù)塊狀態(tài)不一樣外,其他基本相同,其接收塊代碼調(diào)用流程如下:NameNodeRpcServer.blockReceivedAndDeleted()->BlockManager.processIncrementalBlockReport()->BlockManager. addBlock()->BlockManager.processAndHandleReportedBlock()->BlockManager.processReportedBlock(),在方法processReportedBlock中,首先判斷報(bào)告的塊是否元數(shù)據(jù)已經(jīng)從主節(jié)點(diǎn)讀取到,如果沒有加入消息列表

[java] view plain copy

  1. //postpone延期  

  2.     //如果是從節(jié)點(diǎn),可能雖然DN節(jié)點(diǎn)將塊信息報(bào)告上來,但是元數(shù)據(jù)還沒有從日志中消化到  

  3.     if (shouldPostponeBlocksFromFuture &&  

  4.         namesystem.isGenStampInFuture(block.getGenerationStamp())) {  

  5.       queueReportedBlock(dn, block, reportedState,  

  6.           QUEUE_REASON_FUTURE_GENSTAMP);  

  7.       return null;  

  8.     }  

<br font-size:16px;white-space:normal;background-color:#FFFFFF;" />然后從blocksMap中查詢到數(shù)據(jù)塊對(duì)于文件inode,判斷文件是否存在;如果判斷塊屬于損害塊,冗余分?jǐn)?shù)是否不夠等情況,如果塊一切正常,且狀態(tài)為完成,將將其加入blocksMap等集合列表。具體代碼如下:

[java] view plain copy

  1. //檢查塊是否已經(jīng)被損害  

  2.     BlockToMarkCorrupt c = checkReplicaCorrupt(  

  3.         block, reportedState, storedBlock, ucState, dn);  

  4.     if (c != null) {  

  5.       if (shouldPostponeBlocksFromFuture) {  

  6.         // If the block is an out-of-date generation stamp or state,  

  7.         // but we're the standby, we shouldn't treat it as corrupt,  

  8.         // but instead just queue it for later processing.  

  9.         queueReportedBlock(dn, storedBlock, reportedState,  

  10.             QUEUE_REASON_CORRUPT_STATE);  

  11.       } else {  

  12.           //將其加入損害列表  

  13.         toCorrupt.add(c);  

  14.       }  

  15.       return storedBlock;  

  16.     }  

  17.      //如果該數(shù)據(jù)塊正在被構(gòu)建,加入構(gòu)建列表  

  18.     if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {  

  19.       toUC.add(new StatefulBlockInfo(  

  20.           (BlockInfoUnderConstruction)storedBlock, reportedState));  

  21.       return storedBlock;  

  22.     }  

  23.    

  24.     //add replica if appropriate  

  25.     //如果報(bào)告的塊狀態(tài)為FINALIZED且該DN沒有報(bào)告該塊,則加入添加隊(duì)列  

  26.     if (reportedState == ReplicaState.FINALIZED  

  27.         && storedBlock.findDatanode(dn) < 0) {  

  28.       toAdd.add(storedBlock);  

  29.     }  

  30.     return storedBlock;  


感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“HDFS2.X中NameNode塊報(bào)告處理的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI