千家信息网

HDFS2.X中NameNode块报告处理的示例分析

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要介绍了HDFS2.X中NameNode块报告处理的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。NameNode会
千家信息网最后更新 2025年12月02日HDFS2.X中NameNode块报告处理的示例分析

这篇文章主要介绍了HDFS2.X中NameNode块报告处理的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

NameNode会接收两种情况的块报告,DataNode全部块报告与增量块报告。

4.1全量报告分析

目前全量报告以周期性进行报告,既然已经有启动时候的全量数据块报告,错误块报告,增量块报告(包括删除块报告),为什么还需要周期性全量块报告呢?比如某DataNode接受到数据块但是增量报告失败,那就需要周期性报告来解决了,或者NameNode给DN发送了删除块的命令,但是由于网络等异常,DN没收收到删除命令,这样DN再把这些数据块报告上来就是无效块,需要再次放入无效队列,下次心跳再命令DN删除;同时比如每次块报告会清理DatanodeDescriptor对象维护的块列表还有某个块的信息,但是DN节点再也没有报告上来,定时清除这些无效信息,有助于提高块列表的操作性能,从而提供NameNode的性能。同时我们可以考虑分析是否还有其他原因可能影响NameNode的性能。

为了提高HDFS启动速度,在Hadoop2.0版本中全量块报告分为了两种:启动时候块报告与非启动的时候块报告,即是否是第一次块报告。那么具体又是如何来提高启动速度的呢?在启动的时候,不计算哪些文件元数据需要删除,不计算无效快,这些处理都推迟到下一次块报告进行处理

对于第一次块报告,代码调用流程为:NameNodeRpcServer.blockReport()->BlockManager. processReport()->BlockManager.processFirstBlockReport().对Standby节点,如果报告的数据块所相关元数据日志从节点还没有加载完毕,则会将报告的块信息加入一个队列,当Standby节点加载元数据后,再处理该消息队列,第一次块报告处理详细代码如下,可以看到,为了提高报告速度,只有简单的几步进行块报告处理,仅有验证块是否损坏,然后直接判断块状态是否为FINALIZED状态,如果是,就直接建立块与DN节点的映射。

[java] view plain copy

  1. private void processFirstBlockReport(final DatanodeDescriptor node,

  2. final BlockListAsLongs report) throws IOException {

  3. if (report == null) return;

  4. assert (namesystem.hasWriteLock());

  5. assert (node.numBlocks() == 0);

  6. BlockReportIterator itBR = report.getBlockReportIterator();

  7. while(itBR.hasNext()) {

  8. Block iblk = itBR.next();

  9. ReplicaState reportedState = itBR.getCurrentReplicaState();

  10. //对于从节点shouldPostponeBlocksFromFuture为true;判断块时间戳//是否大于目前时间

  11. if (shouldPostponeBlocksFromFuture&&

  12. namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {

  13. //将块信息加入队列,从节点消化完相关日志,会处理该队列

  14. queueReportedBlock(node, iblk, reportedState,

  15. QUEUE_REASON_FUTURE_GENSTAMP);

  16. continue;

  17. }

  18. BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);

  19. // If block does not belong to any file, we are done.

  20. if (storedBlock == null) continue;

  21. // If block is corrupt, mark it and continue to next block.

  22. BlockUCState ucState = storedBlock.getBlockUCState();

  23. BlockToMarkCorrupt c = checkReplicaCorrupt(

  24. iblk, reportedState, storedBlock, ucState, node);

  25. if (c != null) {

  26. //对于从节点,先将块信息加入pendingDNMessages队列

  27. //将块信息加入队列,从节点消化完相关日志,会处理该队列,如果该块还是被损坏,就真的是损坏了

  28. if (shouldPostponeBlocksFromFuture) {

  29. // In the Standby, we may receive a block report for a file that we

  30. // just have an out-of-date gen-stamp or state for, for example.

  31. queueReportedBlock(node, iblk, reportedState,

  32. QUEUE_REASON_CORRUPT_STATE);

  33. } else {

  34. //对于主节点,有块损坏,直接标记为损坏

  35. markBlockAsCorrupt(c, node);

  36. }

  37. continue;

  38. }

  39. // If block is under construction, add this replica to its list

  40. if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {

  41. ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(

  42. node, iblk, reportedState);

  43. //and fall through to next clause

  44. }

  45. //add replica if appropriate

  46. if (reportedState == ReplicaState.FINALIZED) {

  47. addStoredBlockImmediate(storedBlock, node);

  48. }

  49. }

  50. }

而对于非第一次块报告,情况就要复杂一些了,对于报告的每个块信息,不仅会建立块与DN的映射,而且均会检查块是否损坏,块是是否无效,元数据是否已经无效应该删除,是否为UC状态的块等,该过程主要由方法processReport来完成

[java] view plain copy

  1. private void processReport(final DatanodeDescriptor node,

  2. final BlockListAsLongs report) throws IOException {

  3. // Normal case:

  4. // Modify the (block-->datanode) map, according to the difference

  5. // between the old and new block report.

  6. //

  7. Collection toAdd = new LinkedList();

  8. Collection toRemove = new LinkedList();

  9. Collection toInvalidate = new LinkedList();

  10. Collection toCorrupt = new LinkedList();

  11. Collection toUC = new LinkedList();

  12. //统计块,并且判断块是否应该删除,是否应该添加到blocksMap列表等

  13. reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);

  14. // Process the blocks on each queue

  15. for (StatefulBlockInfo b : toUC) {

  16. addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);

  17. }

  18. for (Block b : toRemove) {

  19. removeStoredBlock(b, node);

  20. }

  21. for (BlockInfo b : toAdd) {

  22. addStoredBlock(b, node, null, true);

  23. }

  24. for (Block b : toInvalidate) {

  25. NameNode.stateChangeLog.info("BLOCK* processReport: block "

  26. + b + " on " + node + " size " + b.getNumBytes()

  27. + " does not belong to any file.");

  28. addToInvalidates(b, node);

  29. }

  30. for (BlockToMarkCorrupt b : toCorrupt) {

  31. markBlockAsCorrupt(b, node);

  32. }

  33. }

在reportDiff方法内,实现如下:

[java] view plain copy

  1. private void reportDiff(DatanodeDescriptor dn,

  2. BlockListAsLongs newReport,

  3. Collection toAdd, // add to DatanodeDescriptor

  4. Collection toRemove, // remove from DatanodeDescriptor

  5. Collection toInvalidate, // should be removed from DN

  6. Collection toCorrupt, // add to corrupt replicas list

  7. Collection toUC) { // add to under-construction list

  8. // place a delimiter分隔符 in the list which separates blocks

  9. // that have been reported from those that have not

  10. BlockInfo delimiter = new BlockInfo(new Block(), 1);

  11. boolean added = dn.addBlock(delimiter);

  12. assert added : "Delimiting block cannot be present in the node";

  13. int headIndex = 0; //currently the delimiter is in the head of the list

  14. int curIndex;

  15. if (newReport == null)

  16. newReport = new BlockListAsLongs();

  17. // scan the report and process newly reported blocks

  18. BlockReportIterator itBR = newReport.getBlockReportIterator();

  19. while(itBR.hasNext()) {

  20. Block iblk = itBR.next();

  21. ReplicaState iState = itBR.getCurrentReplicaState();

  22. BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,

  23. toAdd, toInvalidate, toCorrupt, toUC);

  24. // move block to the head of the list

  25. if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {

  26. headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);

  27. }

  28. }

  29. // collect blocks that have not been reported

  30. // all of them are next to the delimiter

  31. //收集DN对象中所有没有被DN节点报告上来的块,将这些块信息从DN对象维护的列表中删除,这样可以有效控制DN块列表中存在大量的无效块,

  32. //影响NameNode的操作性能

  33. Iterator it = new DatanodeDescriptor.BlockIterator(

  34. delimiter.getNext(0), dn);

  35. while(it.hasNext())

  36. toRemove.add(it.next());

  37. dn.removeBlock(delimiter);

  38. }

4.2增量报告分析

相比于全量块报告方式,增量报告报告DN节点很短时间内已经接收完成,或者正在接受或者删除的块,而且为了提高文件上传的效率, DN节点应该尽快将接受到的块报告给NameNode,现在引入了RECEIVING_BLOCK这个一个块状态,有可能就是为了提高写入速度。


增量块报告流程图

正在接收的块与已经接收完的块,除了在数据块状态不一样外,其他基本相同,其接收块代码调用流程如下:NameNodeRpcServer.blockReceivedAndDeleted()->BlockManager.processIncrementalBlockReport()->BlockManager. addBlock()->BlockManager.processAndHandleReportedBlock()->BlockManager.processReportedBlock(),在方法processReportedBlock中,首先判断报告的块是否元数据已经从主节点读取到,如果没有加入消息列表

[java] view plain copy

  1. //postpone延期

  2. //如果是从节点,可能虽然DN节点将块信息报告上来,但是元数据还没有从日志中消化到

  3. if (shouldPostponeBlocksFromFuture &&

  4. namesystem.isGenStampInFuture(block.getGenerationStamp())) {

  5. queueReportedBlock(dn, block, reportedState,

  6. QUEUE_REASON_FUTURE_GENSTAMP);

  7. return null;

  8. }


然后从blocksMap中查询到数据块对于文件inode,判断文件是否存在;如果判断块属于损害块,冗余分数是否不够等情况,如果块一切正常,且状态为完成,将将其加入blocksMap等集合列表。具体代码如下:

[java] view plain copy

  1. //检查块是否已经被损害

  2. BlockToMarkCorrupt c = checkReplicaCorrupt(

  3. block, reportedState, storedBlock, ucState, dn);

  4. if (c != null) {

  5. if (shouldPostponeBlocksFromFuture) {

  6. // If the block is an out-of-date generation stamp or state,

  7. // but we're the standby, we shouldn't treat it as corrupt,

  8. // but instead just queue it for later processing.

  9. queueReportedBlock(dn, storedBlock, reportedState,

  10. QUEUE_REASON_CORRUPT_STATE);

  11. } else {

  12. //将其加入损害列表

  13. toCorrupt.add(c);

  14. }

  15. return storedBlock;

  16. }

  17. //如果该数据块正在被构建,加入构建列表

  18. if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {

  19. toUC.add(new StatefulBlockInfo(

  20. (BlockInfoUnderConstruction)storedBlock, reportedState));

  21. return storedBlock;

  22. }

  23. //add replica if appropriate

  24. //如果报告的块状态为FINALIZED且该DN没有报告该块,则加入添加队列

  25. if (reportedState == ReplicaState.FINALIZED

  26. && storedBlock.findDatanode(dn) < 0) {

  27. toAdd.add(storedBlock);

  28. }

  29. return storedBlock;


感谢你能够认真阅读完这篇文章,希望小编分享的"HDFS2.X中NameNode块报告处理的示例分析"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0