`

Hadoop-balancer执行原理

 
阅读更多

 

核心类在

org.apache.hadoop.hdfs.server.balancer.Balancer

 

均衡算法 伪代码

while(true) {
 1.获取需要迁移的字节数
 if(需要迁移字节数 == 0) {
  return "成功,无需迁移";
 }
 
 2.选择需要迁移的节点
 if(需要移动的数据 == 0) {
  return "没有需要移动的块"
 }
 
 3.开始并行迁移
 4.清空列表
 5.Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
}

 

获取所有的data node节点,计算

initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));

initNodes()函数如下:

计算平均使用量
    long totalCapacity=0L, totalUsedSpace=0L;
    for (DatanodeInfo datanode : datanodes) {
      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
        continue; // ignore decommissioning or decommissioned nodes
      }
      totalCapacity += datanode.getCapacity();
      totalUsedSpace += datanode.getDfsUsed();
    }

 

当前集群的平均使用率(是当前使用的空间/总空间*100),注意这个是百分比计算后再乘100的值,不是百分比

this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;

 

 

四个队列

1.aboveAvgUtilizedDatanodes(超过集群平均使用率 && 低于集群平均使用率+阀值)

2.overUtilizedDatanodes(超过集群平均使用率+阀值)

3.belowAvgUtilizedDatanodes(低于集群平均使用率 && 超过集群平均使用率-阀值)

4.underUtilizedDatanodes(低于集群平均使用率-阀值)

 

2个参数

overLoadedBytes 超过负载值的字节

underLoadedBytes低于负载值的字节

 
//注意这里的阈值默认是10D,这里不是百分比计算集群平均使用率如果为0.5不是50%,而相当于0.5%
//所以如果是0.5-10D就变成负数了,一般来说肯定是小于当前节点使用率的,除非当前节点使用率特别大
//比如当前节点使用率为20,则用百分比来说就是使用了20%,这肯定就超于阈值了,于是这个节点的数据
//就需要均衡了
for (DatanodeInfo datanode : datanodes) {
    if(当前节点使用率 > 集群平均使用率) {
  if(当前节点使用率 <=(集群平均使用率+阀值) && 当前节点使用率 > 集群平均使用率) {
   创建一个BalancerDatanode
   aboveAvgUtilizedDatanodes.save(当前节点)
  }
  else {
   overUtilizedDatanodes.save(当前节点)
   overLoadedBytes += (当前节点使用率-集群平均使用率-阀值)*当前节点总数据量/100
  }
    }
   
    else {
     创建一个BalancerDatanode
     if(当前节点使用率>=(集群平均使用率-阀值) && 当前节点使用率<集群平均使用率) {
      belowAvgUtilizedDatanodes.save(当前节点)
     }
     else {
      underUtilizedDatanodes.save(当前节点)
      underLoadedBytes += (集群平均使用率-阀值-当前节点使用率)*当前节点总数据量/100
     }
    }
}

均衡器只会执行 overUtilizedDatanodes 和 underUtilizedDatanodes队列中的集群

 

 

BalancerDatanode()构造函数

if(当前节点使用率 >= 集群平均使用率+阀值 || 当前节点使用率 <= 集群平均使用率-阀值) {
    一次移动的数据量 = 阀值*当前节点总容量/100
}
else {
    一次移动的数据量 = (集群平均使用率-当前节点使用率) * 当前节点总容量/100
}
一次移动的数据量 = min(当前节点剩余使用量,一次移动的数据量)
一次移动的数据量 = (一次移动数据量上限10G,一次移动的数据量)

 

chooseNodes()函数

chooseNodes(true);	 //首先在相同机架中迁移
chooseNodes(false);	 //在不同机架中迁移

chooseNodes(boolean onRack) {
 chooseTargets(underUtilizedDatanodes.iterator(), onRack);
 chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
 chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
}

chooseTargets() {
 for(源节点 source : overUtilizedDatanodes列表) {
  选择目标节点(source)
 }
}
 
选择目标节点(source) {
 while() {
  1.从候选队列中找到一个节点
  2.如果这个可转移的数据已经满了continue
  3.if(在相同机架中转移)
  4.if(在不同机架中转移)
  5.创建NodeTask
 }
}

//和chooseTargets函数类似
chooseSources() {
 for(目标节点 target : underUtilizedDatanodes) {
  选择源节点()
 }
}

选择源节点(target) {
 while() {
  1.从候选队列中找到一个节点
  2.如果这个节点可转移的数据已经满了continue
  3.if(在相同机架中转移)
  4.if(在不同机架中转移)
  5.创建NodeTask
 }
}

控制台或者日志上会显示  Decided to move 3.55 GB bytes from source_host:50010 to target_host:50010

 

开始并行迁移数据

    for (Source source : sources) {
      futures[i++] = dispatcherExecutor.submit(source.new ());
    }

 

BlockMoveDispatcher线程

1.选择要迁移的节点 chooseNextBlockToMove()
2.if(要迁移的节点 != null) {
 //启动数据迁移,创建一个新线程发送接收数据
 scheduleBlockMove()
 
}
3.获取block列表,继续下一轮迁移

 

发送和接收数据块的dispatch()函数

//使用阻塞IO的方式发送数据并接收返回的结果
        sock.connect(NetUtils.createSocketAddr(
            target.datanode.getName()), HdfsConstants.READ_TIMEOUT);
        sock.setKeepAlive(true);
        out = new DataOutputStream( new BufferedOutputStream(
            sock.getOutputStream(), FSConstants.BUFFER_SIZE));
        sendRequest(out);
        in = new DataInputStream( new BufferedInputStream(
            sock.getInputStream(), FSConstants.BUFFER_SIZE));
        receiveResponse(in);
        bytesMoved.inc(block.getNumBytes());

 

分享到:
评论

相关推荐

    hadoop-管理

    最后,在 master 节点上运行 `bin/hadoop balancer` 来进行负载均衡,确保集群资源的合理分布。 针对 HBase 集群,RegionServer 宕机时,可以使用 `./hbase-daemon.sh start regionserver` 命令重启。若 ...

    hadoop-utils:hadoop实用程序

    通过阅读源码,可以了解这些工具的工作原理,进行定制化开发,或者为Hadoop生态系统贡献新的功能。 **学习与实践** 对于想要深入理解Hadoop和大数据处理的IT从业者,研究hadoop-utils的源代码是一个很好的起点。你...

    hadoop和hive调优个人总结

    hadoop和hive调优个人总结 Hadoop和Hive调优是当前大数据处理中...Hadoop和Hive的调优是一个复杂的过程,需要对Hadoop和Hive的原理和机理有深入的了解,掌握了调优的技巧和方法,才能提高Hadoop和Hive的性能和可靠性。

    Hadoop命令手册

    ### Hadoop命令手册知识点 #### 概述 Hadoop是一个开源软件框架,主要用于分布式存储与处理大规模数据集。...这对于初学者来说是一份非常宝贵的资源,可以帮助他们快速上手并深入理解Hadoop的工作原理及应用。

    HBase 应用平台 balancer 功能

    通过阅读源码,我们可以深入理解balancer的工作原理,例如如何计算负载、决定迁移哪些Region以及如何执行迁移操作。 此外,`tools`标签可能指的是HBase提供的工具集,其中可能包含了用于手动触发balancer的命令行...

    hadoop大数据实战手册

    3.2.3 WebHdfsFileSystem 执行器调用……··……………………………………………………… 130 3.2.4 WebHDFS 的0Auth2 认证…·………………………………………………………………… 1 日 3.2.5 WebHDFS 的使用……...

    HBase源代码 hbase-0.98.23

    通过分析`org.apache.hadoop.hbase.masterAssignment.RegionStates`和`org.apache.hadoop.hbase.master.LoadBalancer`等类,我们可以了解HBase如何实现集群的负载均衡和容错能力。 在大数据处理中,HBase的性能优化...

    快速学习-DataNode

    【快速学习-DataNode】章节主要讲解了Hadoop分布式文件系统(HDFS)中DataNode的角色、工作机制、数据完整性保障以及...理解和掌握DataNode的工作原理以及如何管理DataNode,对于Hadoop集群的运维和性能优化至关重要。

    hdfs使用方法.rar

    1. **HDFS命令行工具**:通过`hadoop fs`命令,用户可以执行如`put`、`get`、`rm`等操作,对HDFS上的文件进行管理。 2. **HDFS API**:Java API提供了对HDFS的全面操作,包括文件创建、读写、删除等,常用于开发...

    exSpeedUp:第7个学期进行的第一个练习,来自里卡多·德斯特(Ricardo Destro),Centro Universitario FEI的科学计算机课程的分布式系统课程的老师

    6. **分布式数据存储**:了解如何使用分布式数据存储系统(如Hadoop HDFS或Cassandra)来存储和处理大量数据,这在分布式计算中十分关键。 7. **性能优化**:exSpeedUp项目可能会让学生探究如何通过并行化、数据...

Global site tag (gtag.js) - Google Analytics