HMaster的整体结构
一个master包含如下部分:
1.对外的接口
RPC服务
jetty web服务
Master MBean
其中RPC服务包括了若干listener,reader,以及handler线程(IPC Handler和 用于replication的IPC Handler)
2.执行服务
都是一些线程池,当有任务出现时就就会交给这些类来处理
这些线程有
MASTER_SERVER_OPERATIONS
MASTER_META_SERVER_OPERATIONS
MASTER_CLOSE_REGION
MASTER_OPEN_REGION
MASTER_TABLE_OPERATIONS
相关的hanlder有:
OpenRegionHandler
ClosedRegionHandler
ServerShutdownHandler
MetaServerShutdownHandler
DeleteTableHandler
DisableTableHandler
EnableTableHandler
ModifyTableHandler
CreateTableHandler
3.和zookeeper相关的线程
1.ActiveMasterManager 会在ZK中创建/hbase/master短暂节点,master将其信息记录到这个节点下 如果是备份的master会在这里阻塞,直到这个节点为空 2.RegionServerTracker 用于监控region server,通过监控ZK的/hbase/rs节点,获取region server的状态 当region server上线或者下线,ZK都会触发通知事件 3.DrainingServerTracker 没太明白,貌似是处理RS增加和删除事件用的 4.CatalogTracker 用来监控META表和ROOT表 5.ClusterStatusTracker 用于监控ZK的/shutdown节点,监控是否有机器宕机了 6.AssignmentManager 用于管理和分配region的 7.RootRegionTracker 用于管理和监控/root-region-server 节点的 8.LoadBalancer 用于平衡各个regoin server上的region 9.MetaNodeTracker 监控/unassigned 节点,分配那些未在META表中存在的region 此外在 org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher类中还负责管理一些ZK节点 baseZNode /hbase assignmentZNode /unassigned rsZNode /rs drainingZNode /draining masterTableZNode /table masterTableZNode92 /table92 (用于hbase0.92版本) splitLogZNode /splitlog backupMasterAddressesZNode /backup-masters clusterStateZNode /shutdown masterAddressZNode /master clusterIdZNode /hbaseid
ZK监听相关的类图
4.文件接口和其他
MasterFileSystem
用于创建META表和ROOT表,.oldlog目录,hbase.version文件等
LogCleaner
用于定期的清理.oldlog目录中的内容
HFileCleaner
用于定期清理归档目录下的内容
其他包括后台线程如LogCleaner和HFileCleaner等
ServerManager 维护一个在线和下线的RS列表
Balancer 用于执行region均衡的后台线程
HMaster的相关配置
参数名称 | 默认值 | 含义 |
hbase.master.handler.count | 25 | 工作线程大小 |
hbase.master.buffer.for.rs.fatals | 1M | |
mapred.task.id | ||
hbase.master.wait.for.log.splitting | false | |
zookeeper.session.timeout | 180秒 | |
hbase.master.backup | ||
hbase.master.impl | ||
hbase.master.event.waiting.time | 1000 |
HMaster的启动入口类
org.apache.hadoop.hbase.master.HMaster
hbase-site.xml中可以配置参数 hbase.master.impl来自定自己的实现,但必须继承HMaster
之后调用HMasterCommandLine (这个类继承自ServerCommandLine)
HMasterCommandLine使用hadoop提供的ToolRunner去运行
ToolRunner#run(Configuration,Tool,String[])
ToolRunner会调用GenericOptionsParser,解析一些固定的参数,如-conf,-D,-fs,-files 这样的参数
解析好之后,配置configuration对象,然后将启动参数传给Tool接口的实现
所以ToolRunner 就是一个启动参数解析,配置configuration对象的工具类,然后将这些信息交给Tool实现类
调用顺序是
1.HMaster#main()
2.HMasterCommandLine#doMain()
3.ToolRunner#run()
4.HMasterCommandLine#run()
5.HMasterCommandLine#startMaster()
6.HMaster#constructMaster()
7.反射调用HMaster的构造函数
初始化-调用HRgionServer构造函数
1.配置host,NDS相关
2.配置RPC连接,创建RPC连接
3.初始化ZK认证
4.创建ZooKeeperWatcher(和ZK相关的线程),RPC服务,metrics
5.创建HealthCheckChore
6.配置splitlog相关
启动,HMaster#run (在新线程中启动)
//将当前的master变成active状态(如果是备份master则一直等待) //完成初始化 HMaster#run() { becomeActiveMaster(startupStatus); finishInitialization(startupStatus, false); } //如果当前的master不是活跃的则一直等待 HMaster#becomeActiveMaster() { this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,this); this.zooKeeper.registerListener(activeMasterManager); while (!amm.isActiveMaster()) { Thread.sleep(c.getInt("zookeeper.session.timeout", 180 * 1000)); } this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); this.clusterStatusTracker.start(); return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus,this.clusterStatusTracker); } //初始化master组件,文件系统,ServerManager //AssignmentManager,RegionServerTracker,CatalogTracker等 //设置Zookeeper的集群状态 //等待RegionServer的检查完毕 //如果.log目录下有文件,则执行split log任务 //分配ROOT和META的region //处理可以运行的RegionServer和宕机的RegionServer HMaster#finishInitialization() { //检查ROOT和META表是否存在,不存在则创建,还会创建tmp目录,oldlog目录 fileSystemManager = new MasterFileSystem(); tableDescriptors = new FSTableDescriptors(fileSystemManager.getFileSystem(),fileSystemManager.getRootDir()); //创建CatalogTracker,LoadBalancer,AssignmentManager //RegionServerTracker,DrainingServerTracker //ClusterStatusTracker,SnapshotManager initializeZKBasedSystemTrackers(); //开启service线程,如openregion线程,closeregion线程,serveroptions线程等 //再开启jetty服务和RPC服务 startServiceThreads(); //将所有的RegionServer加入到ServerManager中,ServerManager负责管理 //所有在线宕机的server,并负责启动和关闭 for (ServerName sn: regionServerTracker.getOnlineServers()) { ServerManager.recordNewServer(sn, HServerLoad.EMPTY_HSERVERLOAD); } //如果有log日志则进行预处理然后挂到ZK上,再由所有RS处理 if (waitingOnLogSplitting) { fileSystemManager.splitAllLogs(servers); } //如果ROOT表和META为分配则先分配 assignRoot(); assignMeta(); enableServerShutdownHandler(); //处理所有宕机的server for (ServerName curServer : failedServers) { serverManager.expireServer(curServer); } DefaultLoadBalancer.setMasterServices(); startCatalogJanitorChore(); registerMBean(); } HMaster#assignRoot() { //先看一下分区正在转换状态当中, //如果处于转换状态当中则先处理相关的状态,并等待体处理结束后再往下进行 processRegionInTransitionAndBlockUntilAssigned(); verifyRootRegionLocation(); getRootLocation(); expireIfOnline(); //先删掉"/hbase/root-region-server",不管它存不存在 //KeeperException.NoNodeException被忽略了 //写入EventType.M_ZK_REGION_OFFLINE、当前时间戳、跟分区名(-ROOT-,,0) //master的版本化ServerName //到/hbase/unassigned/70236052, payload为null,所以不写入 }
HMaster#run的时序图如下
HMaster包含的一些变量
InfoServer
ZooKeeperWatcher
ActiveMasterManager
RegionServerTracker
DrainingServerTracker
RPCServer
MasterMetrics
MasterFileSystem
ServerManager
AssignmentManager
CatalogTracker
ClusterStatusTracker
CatalogJanitor
LogCleaner
HFileCleaner
TableDescriptors
SnapshotManager
HealthCheckChore
HMaster的线程
RPC相关的的listener线程,reader线程,handler线程
Daemon Thread [IPC Server listener on 60000] (Suspended)
Daemon Thread [IPC Reader 3 on port 60000] (Suspended)
Daemon Thread [IPC Server handler 0 on 60000] (Suspended)
Daemon Thread [REPL IPC Server handler 2 on 60000] (Running)
Daemon Thread [IPC Server Responder] (Running)
ZK相关线程
Daemon Thread [main-EventThread] (Suspended)
Daemon Thread [main-SendThread(myhost:2181)] (Suspended)
后台线程
Daemon Thread [myhost,60000,1427458363875-BalancerChore] (Running)
Daemon Thread [myhost,60000,1427458363875-CatalogJanitor] (Running)
Daemon Thread [master-myhost,60000,1427458363875.archivedHFileCleaner] (Running)
Daemon Thread [master-myhost,60000,1427458363875.oldLogCleaner] (Running)
Daemon Thread [myhost,60000,1427458363875.splitLogManagerTimeoutMonitor] (Running)
Daemon Thread [myhost,60000,1427458363875.timerUpdater] (Running)
监控线程
Daemon Thread [Timer thread for monitoring hbase] (Running)
Daemon Thread [Timer thread for monitoring jvm] (Running)
Daemon Thread [Timer thread for monitoring rpc] (Running)
Daemon Thread [myhost,60000,1427458363875.timeoutMonitor] (Running)
jetty相关线程
Thread [1008881877@qtp-314160763-0] (Running)
timeoutMonitor(用于分配region)线程执行原理(AssignmentManager$TimeoutMonitor)
执行逻辑如下:
//在独立的线程中运行 //从Chore#run()函数调到这里的 AssignmentManager$TimeoutMonitor#chore() { for (RegionState regionState : regionsInTransition.values()) { if (regionState.getStamp() + timeout <= now) { //decide on action upon timeout actOnTimeOut(regionState); } else if (this.allRegionServersOffline && !allRSsOffline) { RegionPlan existingPlan = regionPlans.get(regionState.getRegion().getEncodedName()); if (existingPlan == null || !this.serverManager.isServerOnline(existingPlan.getDestination())) { actOnTimeOut(regionState); } } } } //判断当前region的状态,如果下线了则分配 AssignmentManager$TimeoutMonitor#actOnTimeOut() { HRegionInfo regionInfo = regionState.getRegion(); switch (regionState.getState()) { case CLOSED: regionState.updateTimestampToNow(); break; case OFFLINE: invokeAssign(regionInfo); break; case PENDING_OPEN: invokeAssign(regionInfo); break; case OPENING: processOpeningState(regionInfo); break; case OPEN: regionState.updateTimestampToNow(); break; case PENDING_CLOSE: invokeUnassign(regionInfo); break; case CLOSING: invokeUnassign(regionInfo); break; } //通过AssignCallable#call()调用 //分配region,先修改ZK的znode信息 //然后调用sendRegionOpen(),这里会触发HRegionServer#openRegion()函数 //最后创建OpenRegionHandler放到线程池中执行, //再调用HRegion#openRegion()函数 AssignmentManager#assign() { for (int i = 0; i < this.maximumAssignmentAttempts; i++) { String tableName = region.getTableNameAsString(); if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) { setEnabledTable(region); } RegionOpeningState regionOpenState = ServerManager.sendRegionOpen(); if (regionOpenState == RegionOpeningState.OPENED) { return; } else if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName); } } } //处理未分配的region,将其关闭 AssignmentManager#unassign() { state = regionsInTransition.get(encodedName); if (state == null) { ZKAssign.createNodeClosing(master.getZooKeeper(), region, master.getServerName()); } else if (force && (state.isPendingClose() || state.isClosing())) { state.update(state.getState()); } else { return; } ServerName server = regions.get(region); if (server == null) { deleteClosingOrClosedNode(region); } ServerManager.sendRegionClose(); }
CatalogJanitor线程(CatalogJanitor)
这个线程用于扫描split后残留的部分,比如split之后父region的META信息可以删除了
同样split之后,info:splitA和info:splitB这两个META表中的信息也可以删除了
主要逻辑如下:
//在独立的线程中运行 //从Chore#run()函数调到这里的 CatalogJanitor#scan() { Pair<Integer, Map<HRegionInfo, Result>> pair = getSplitParents(); Map<HRegionInfo, Result> splitParents = pair.getSecond(); int cleaned = 0; for (Map.Entry<HRegionInfo, Result> e : splitParents.entrySet()) { if (!parentNotCleaned.contains(e.getKey().getEncodedName())) { cleanParent(e.getKey(), e.getValue()); cleaned++; } else { //info:splitA 和 info:splitB 列 parentNotCleaned.add(getDaughterRegionInfo("splitA"); parentNotCleaned.add(getDaughterRegionInfo("splitB"); } } } //如果分割之后的splitA和splitB两个新region不再引用 //父region,则将父region删除 //最后创建Delete对象删除父对象,再将其从META表中删除 CatalogJanitor#cleanParent() { HRegionInfo a_region = getDaughterRegionInfo(rowContent, "splitA"); HRegionInfo b_region = getDaughterRegionInfo(rowContent, "splitB"); Pair<Boolean, Boolean> a = checkDaughterInFs(parent, a_region, "splitA"); Pair<Boolean, Boolean> b = checkDaughterInFs(parent, b_region, "splitB"); removeDaughtersFromParent(parent); FileSystem fs = this.services.getMasterFileSystem().getFileSystem(); HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, parent); Delete delete = new Delete(regionInfo.getRegionName()); deleteFromMetaTable(catalogTracker, delete); } //检查splitA和splitB两个新region是否还引用父region CatalogJanitor#checkDaughterInFs() { FileSystem fs = this.services.getMasterFileSystem().getFileSystem(); Path rootdir = this.services.getMasterFileSystem().getRootDir(); Path tabledir = new Path(rootdir, split.getTableNameAsString()); Path regiondir = new Path(tabledir, split.getEncodedName()); exists = fs.exists(regiondir); HTableDescriptor parentDescriptor = getTableDescriptor(parent.getTableName()); for (HColumnDescriptor family: parentDescriptor.getFamilies()) { Path p = Store.getStoreHomedir(tabledir, split.getEncodedName(),family.getName()); if (!fs.exists(p)) { continue; } // Look for reference files. Call listStatus with anonymous instance of PathFilter. FileStatus [] ps = FSUtils.listStatus(fs, p, new PathFilter () { public boolean accept(Path path) { return StoreFile.isReference(path); } }); } } //创建Delete对象,将META表中的splitA和splitB //这些在split时候创建的已经无用的列删除 CatalogJanitor#removeDaughtersFromParent() [ Delete delete = new Delete(parent.getRegionName()); delete.deleteColumns("info","splitA"); delete.deleteColumns("info","splitB"); deleteFromMetaTable(catalogTracker, delete); }
BalancerChore线程(HMaster#balance)
这个类负责执行balance过程,具体逻辑如下:
//在单独线程中执行,通过HMaster$2#run()调用到这里的 //收集所有的region然后执行balance() //具体细节没看明白 HMaster#balance() { Map<String, Map<ServerName, List<HRegionInfo>>> assignmentsByTable = this.assignmentManager.getAssignmentsByTable(); List<RegionPlan> plans = new ArrayList<RegionPlan>(); for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) { List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments); if (partialPlans != null) { plans.addAll(partialPlans); } } for (RegionPlan plan: plans) { AssignmentManager.balance(plan); } } //执行balance过程,将待执行的region放到map中 //最后执行unassign()函数没看懂 AssignmentManager#balance() { synchronized (this.regionPlans) { this.regionPlans.put(plan.getRegionName(), plan); } unassign(plan.getRegionInfo()); }
archivedHFileCleaner线程(HFileCleaner#chore)
这个类用于删除archive目录下的归档文件,具体逻辑如下:
//这里是调用父类CleanerChore#chore()函数 //用来清理.archive目录下的归档文件 HFileCleaner#chore() { FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null); for (FileStatus file : files) { if (file.isDir()) { checkAndDeleteDirectory(file.getPath()); } else { checkAndDelete(file.getPath()); } } } //检查并删除目录 CleanerChore#checkAndDeleteDirectory() { FileStatus[] children = FSUtils.listStatus(fs, toCheck, null); HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck); } //检查并删除文件 CleanerChore#checkAndDelete() { HBaseFileSystem.deleteDirFromFileSystem(fs, filePath); }
oldLogCleaner线程(LogCleaner)
这个类用于oldlog目录下文件
具体执行逻辑和archivedHFileCleaner线程一样
都是调用父类CleanerChore#chore()函数去执行的
timerUpdater线程(AssignmentManager$TimerUpdater#chore)
这个类用于更新region的时间戳,这些region都是出于事务中的region
主要逻辑如下:
//在单独线程中执行,通过Chore#run()调用到这里的 AssignmentManager$TimerUpdater#chore() { while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) { if (serverToUpdateTimer == null) { serverToUpdateTimer = serversInUpdatingTimer.first(); } else { serverToUpdateTimer = serversInUpdatingTimer.higher(serverToUpdateTimer); } updateTimers(serverToUpdateTimer); } } //更新处于事务中的region的时间戳 //这里会迭代所有机器,然后更新每个机器上的region AssignmentManager#updateTimers() { for (Map.Entry<String, RegionPlan> e: copy.entrySet()) { rs = this.regionsInTransition.get(e.getKey()); rs.updateTimestampToNow(); } }
splitLogManagerTimeoutMonitor线程(SplitLogManager$TimeoutMonitor#chore)
这个类用于周期性的检查是否有执行超时的任务(获取ZK的split节点的任务,然后执行切分日志工作),如果有则
需要重新提交这个任务,如果出现region下线,server宕机等情况也需要重新提交,最后删除失败的任务
具体逻辑如下:
//在单独线程中执行,通过Chore#run()调用到这里的 //周期性的检查是否有处理splitlog超时的region,或者 //出现某些region下线了,这时候需要重新提交splitlog //最后将失败的任务删除掉 SplitLogManager$TimeoutMonitor#chore() { for (Map.Entry<String, Task> e : tasks.entrySet()) { if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { if (resubmit(path, task, FORCE)) { resubmitted++; } else { //将死掉的工作regoin server放入列表中 handleDeadWorker(cur_worker); } } else if (resubmit(path, task, CHECK)) { resubmitted++; } } for (Map.Entry<String, Task> e : tasks.entrySet()) { String path = e.getKey(); Task task = e.getValue(); if (task.isUnassigned() && (task.status != FAILURE)) { // We just touch the znode to make sure its still there tryGetDataSetWatch(path); } } createRescanNode(Long.MAX_VALUE); // Retry previously failed deletes if (failedDeletions.size() > 0) { for (String tmpPath : tmpPaths) { // deleteNode is an async call deleteNode(tmpPath, zkretries); } } } //异步删除节点 SplitLogManager#deleteNode() { ZooKeeper.delete(path, -1, new DeleteAsyncCallback(),retries); }
参考
相关推荐
4. 分布式架构:HBase能够自动分区和复制数据,以实现水平扩展和高可用性。 5. 弹性扩展:根据数据量的增长,可以通过增加节点来扩展集群。 6. MapReduce支持:HBase与Hadoop生态系统紧密集成,支持MapReduce进行...
HBase的架构基于Hadoop的Master/Slave模型,由一个HMaster节点和多个RegionServer节点组成。HMaster负责元数据管理、 Region分配和故障恢复,而RegionServer负责实际的数据存储和服务。 4. **列族和表** HBase中...
9. **数据读写流程**:HBase使用Master-Slave架构,客户端通过HMaster找到数据所在的RegionServer,然后直接与RegionServer交互。读写操作是原子性的,保证数据一致性。 10. **监控与调优**:HBase提供了丰富的监控...
本文将以hbase-0.98.7-hadoop2-bin.tar这个压缩包为例,深入探讨HBase的核心概念、架构以及如何进行安装与配置。 首先,让我们来看看HBase的版本号。"0.98.7-hadoop2"表明这是HBase的0.98系列的一个版本,该系列是...
3. **分布式架构**:HBase的数据分布在多台机器上,每台机器都是一个RegionServer,负责一部分数据的存储和检索。RegionServer之间的数据分布由一个单独的HMaster进程管理。 4. **行键(Row Key)**:HBase中的数据...
1. **分布式架构**:HBase采用Master-Slave架构,由一个HMaster主节点负责区域服务器(RegionServer)的管理、故障检测和负载均衡,多个RegionServer存储实际的数据,提供读写服务。 2. **表和列族**:HBase的表是...
本文将围绕"Hbase-1.2.6-bin+src.tar.rar"这一资源,深入探讨HBase的核心概念、架构、功能以及源码解析。 一、HBase概述 1.1 核心概念 - 表(Table):HBase中的数据组织方式,由多个行组成。 - 行(Row):通过...
5. 高可用性:通过HMaster和RegionServer的架构,HBase提供了故障切换和数据复制功能,确保服务的连续性和数据的可靠性。 6. 数据版本:HBase支持多版本数据,可以保存一定数量的历史版本,方便审计和回溯。 在...
在HBase的分布式架构中,`org.apache.hadoop.hbase.client.HConnectionManager`负责管理客户端与HBase服务器之间的连接,而`org.apache.hadoop.hbase.regionserver.HRegionServer`是处理Region服务的主要组件,它...
3. **HBase架构**: - Master节点:管理RegionServer分配,处理元数据变更,监控RegionServer状态。 - RegionServer:实际存储数据,处理客户端请求,负责Region的分裂和合并。 - Zookeeper:协调集群,存储元...
1. **HBase架构**:HBase采用Master-Slave架构,由HMaster和HRegionServer两部分组成。HMaster负责元数据管理、负载均衡、 Region迁移等全局任务,而HRegionServer则实际存储和处理数据,是数据处理的主要角色。 2....
1. HMaster:管理Region Server,处理Region分配和故障恢复。 2. HRegionServer:实际存储和处理数据的节点。 3. HLog:日志系统,用于数据持久化和故障恢复。 4. HFile:HBase的数据存储格式,优化了列存取。 5. ...
2. **分布式架构**:HBase通过RegionServer进行分布式存储和处理。每个RegionServer负责一部分行键范围内的数据,随着数据增长,Region会自动分裂以保持性能。ZooKeeper被用来协调集群中的各种操作,如选举主节点,...
【HBase优化-系统架构】主要探讨了针对HBase性能的优化策略,特别是从系统架构层面进行调优的方法。HBase作为一个分布式、列族式的NoSQL数据库,性能优化至关重要,尤其是对于大数据处理场景。 首先,关注到配置...
在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和HRegionServer通信。对于数据读写操作,Client直接与HRegionServer交互,而对于表管理和元数据操作,Client则与HMaster...
在理解HBase的体系架构之前,首先需要了解Hadoop的基础知识,因为HBase是构建在Hadoop之上的。 Hadoop是大数据处理的一个核心框架,由HDFS(Hadoop Distributed File System)和MapReduce两大部分组成。HDFS提供了...
为了搭建一个完全分布式的 HBase 集群,我们需要了解 HBase 的架构、组件和配置。 一、HBase 架构 HBase 的架构主要由以下几个组件组成: * HMaster:负责管理 HBase 集群的节点,处理客户端的请求,并将数据分配...
总之,HBase的架构和组件设计体现了它作为一个分布式NoSQL数据库的优势和特点,通过合理的数据划分、负载均衡和故障转移机制,保证了数据存储的高可靠性和系统的高性能。HBase特别适用于处理大量数据的实时读写操作...
### HBase体系架构 HBase是建立在Hadoop之上的分布式、可扩展的列族数据库。它能够处理PB级别的大规模数据,并提供实时读写能力。HBase的核心设计思想源自Google的Bigtable论文。 #### HBase架构组件 1. **...
在架构上,HBase采用了Master-Slave结构,主要由HMaster和HRegionServer两部分组成: 1. HMaster:负责全局协调,包括 Region 分配、Region 服务器的监控与故障恢复、元数据的更新等。HMaster并不参与数据存储和...