接上文,我们创建表t1,列族c1,hbase.root目录为/new。当创建空表时,系统会自动生成一个空region,我们以这个region分配过程看下Region是如何在HMaster和Region server(以下简称rs)中创建的。大致过程如下:
1.HMaster指定分配计划,一个region只会分配给一个rs,多个rs均匀分配
2.多个rs并发执行assiagnment操作
3.先在zk的/hbase/assiangment目录下创建region节点,状态为‘offline’
4.RPC对应rs,请求分配region
5.master端开始等待所有region都被分配,通过zk的节点状态通信
6.rs端收到请求,执行异步OpenRegion操作
7.rs先把zk节点状态改为'opening'
8.rs执行open region操作,并初始化region,主要是创建region的HDFS目录,初始化Store
9.rs修改meta表中region对应的记录信息
10.rs修改zk节点中的状态为'opened'
11.master收到'opened'信息,认为该region已经assiagnment成功
12.所有region都成功后,master认为region批量创建成功
大概类图
在HMaster端提供了BulkAssigner,用来批量分配region,默认采用随即均匀分配,分配过程是一个rpc调用
public boolean bulkAssign(boolean sync) throws InterruptedException, IOException { boolean result = false; ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setDaemon(true); builder.setNameFormat(getThreadNamePrefix() + "-%1$d"); builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); int threadCount = getThreadCount(); java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(threadCount, builder.build()); try { //提交任务,任务为SingleServerBulkAssigner populatePool(pool); // How long to wait on empty regions-in-transition. If we timeout, the // RIT monitor should do fixup. //等待 if (sync) result = waitUntilDone(getTimeoutOnRIT()); } finally { // We're done with the pool. It'll exit when its done all in queue. pool.shutdown(); } return result; }
等待过程
boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions) throws InterruptedException { // Blocks until there are no regions in transition. //如果带处理的region有一个还在事务列表中,则继续等 //超时时间由hbase.bulk.assignment.waiton.empty.rit设置,默认5分钟 long startTime = System.currentTimeMillis(); long remaining = timeout; boolean stillInTransition = true; synchronized (regionsInTransition) { while (regionsInTransition.size() > 0 && !this.master.isStopped() && remaining > 0 && stillInTransition) { int count = 0; for (RegionState rs : regionsInTransition.values()) { if (regions.contains(rs.getRegion())) { count++; break; } } if (count == 0) { stillInTransition = false; break; } regionsInTransition.wait(remaining); remaining = timeout - (System.currentTimeMillis() - startTime); } } return stillInTransition; }
AssignmentManager提供了assign(final ServerName destination,final List<HRegionInfo> regions)给每个rs批量assign region
void assign(final ServerName destination, final List<HRegionInfo> regions) { .... //强制初始化状态为offline List<RegionState> states = new ArrayList<RegionState>(regions.size()); synchronized (this.regionsInTransition) { for (HRegionInfo region: regions) { states.add(forceRegionStateToOffline(region)); } } ..... // Presumption is that only this thread will be updating the state at this // time; i.e. handlers on backend won't be trying to set it to OPEN, etc. //给每个带分配的region创建zk的节点,目录为/hbase/unassigned,并初始化状态为offline。 //节点创建成功后,在callback中调用zk的exist,设置watcher,在exist操作的callback中将region的状态设为‘PENDING_OPEN’,递增counter //所有region都需要设置成功 AtomicInteger counter = new AtomicInteger(0); CreateUnassignedAsyncCallback cb = new CreateUnassignedAsyncCallback(this.watcher, destination, counter); for (RegionState state: states) { if (!asyncSetOfflineInZooKeeper(state, destination, cb, state)) { return; } } // Wait until all unassigned nodes have been put up and watchers set. int total = regions.size(); for (int oldCounter = 0; true;) { int count = counter.get(); if (oldCounter != count) { LOG.info(destination.toString() + " unassigned znodes=" + count + " of total=" + total); oldCounter = count; } if (count == total) break; Threads.sleep(1); } // Move on to open regions. try { // Send OPEN RPC. If it fails on a IOE or RemoteException, the // TimeoutMonitor will pick up the pieces. //发送RPC请求给rs,如果rpc失败,可重试,最大超时时间60s long maxWaitTime = System.currentTimeMillis() + this.master.getConfiguration(). getLong("hbase.regionserver.rpc.startup.waittime", 60000); while (!this.master.isStopped()) { try { this.serverManager.sendRegionOpen(destination, regions); break; } catch (RemoteException e) { IOException decodedException = e.unwrapRemoteException(); if (decodedException instanceof RegionServerStoppedException) { LOG.warn("The region server was shut down, ", decodedException); // No need to retry, the region server is a goner. return; } else if (decodedException instanceof ServerNotRunningYetException) { // This is the one exception to retry. For all else we should just fail // the startup. long now = System.currentTimeMillis(); if (now > maxWaitTime) throw e; LOG.debug("Server is not yet up; waiting up to " + (maxWaitTime - now) + "ms", e); Thread.sleep(1000); } throw decodedException; } } } ....... }
rs的RPC接口HRegionInterface.openRegions(final List<HRegionInfo> regions),rs初始化region,并通过zk状态告知master是否成功,这是一个异步过程。
用户表open region为OpenRegionHandler,处理
public void process() throws IOException { try { ..... // If fails, just return. Someone stole the region from under us. // Calling transitionZookeeperOfflineToOpening initalizes this.version. //将/hbase/unassigned下的节点状态从‘offline’改成‘opening’ if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) { LOG.warn("Region was hijacked? It no longer exists, encodedName=" + encodedName); return; } // Open region. After a successful open, failures in subsequent // processing needs to do a close as part of cleanup. //执行open操作 region = openRegion(); if (region == null) { tryTransitionToFailedOpen(regionInfo); return; } boolean failed = true; //open成功后,先更新下zk中的节点时间,再修改meta表中的region记录 //主要是修改meta表中的serverstartcode和server列 if (tickleOpening("post_region_open")) { if (updateMeta(region)) { failed = false; } } //如果修改失败,或者进入stop阶段,关闭region,将zk节点状态设为‘FAILED_OPEN’ if (failed || this.server.isStopped() || this.rsServices.isStopping()) { cleanupFailedOpen(region); tryTransitionToFailedOpen(regionInfo); return; } //将zk节点状态设为‘OPENED’,如果失败,关闭region if (!transitionToOpened(region)) { // If we fail to transition to opened, it's because of one of two cases: // (a) we lost our ZK lease // OR (b) someone else opened the region before us // In either case, we don't need to transition to FAILED_OPEN state. // In case (a), the Master will process us as a dead server. In case // (b) the region is already being handled elsewhere anyway. cleanupFailedOpen(region); return; } // Successful region open, and add it to OnlineRegions //添加到online列表 this.rsServices.addToOnlineRegions(region); ..... }
Region初始化
private long initializeRegionInternals(final CancelableProgressable reporter, MonitoredTask status) throws IOException, UnsupportedEncodingException { ..... // Write HRI to a file in case we need to recover .META. status.setStatus("Writing region info on filesystem"); //写入.regioninfo文件,内容是HRegionInfo序列化的内容,region的元信息 checkRegioninfoOnFilesystem(); // Remove temporary data left over from old regions status.setStatus("Cleaning up temporary data from old regions"); //.tmp目录删除 cleanupTmpDir(); // Load in all the HStores. // // Context: During replay we want to ensure that we do not lose any data. So, we // have to be conservative in how we replay logs. For each store, we calculate // the maxSeqId up to which the store was flushed. And, skip the edits which // is equal to or lower than maxSeqId for each store. //每个family启动一个线程加载store //等全部store都加载后,取最大的seqId和memstoreTS Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>( Bytes.BYTES_COMPARATOR); long maxSeqId = -1; // initialized to -1 so that we pick up MemstoreTS from column families long maxMemstoreTS = -1; if (this.htableDescriptor != null && !htableDescriptor.getFamilies().isEmpty()) { // initialize the thread pool for opening stores in parallel. ThreadPoolExecutor storeOpenerThreadPool = getStoreOpenAndCloseThreadPool( "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString()); CompletionService<Store> completionService = new ExecutorCompletionService<Store>(storeOpenerThreadPool); // initialize each store in parallel for (final HColumnDescriptor family : htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + family); completionService.submit(new Callable<Store>() { public Store call() throws IOException { return instantiateHStore(tableDir, family); } }); } try { for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { Future<Store> future = completionService.take(); Store store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store); long storeSeqId = store.getMaxSequenceId(); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), storeSeqId); if (maxSeqId == -1 || storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } long maxStoreMemstoreTS = store.getMaxMemstoreTS(); if (maxStoreMemstoreTS > maxMemstoreTS) { maxMemstoreTS = maxStoreMemstoreTS; } } ...... } mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.regiondir, maxSeqIdInStores, reporter, status)); ....... this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). //递增seqid long nextSeqid = maxSeqId + 1; ...... return nextSeqid; }
rs端的处理就是这些,master端通过zk的watcher监听rs端的region状态修改,AssignmentManager的nodeDataChanged方法就是用来处理这个的。
public void nodeDataChanged(String path) { if(path.startsWith(watcher.assignmentZNode)) { try { Stat stat = new Stat(); //当data变化时,获取data,然后再设置watcher,下次继续处理 RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat); if (data == null) { return; } handleRegion(data, stat.getVersion()); } catch (KeeperException e) { master.abort("Unexpected ZK exception reading unassigned node data", e); } } }
当rs把region状态设为opening时
case RS_ZK_REGION_OPENING: ..... // Transition to OPENING (or update stamp if already OPENING) //更新时间 regionState.update(RegionState.State.OPENING, data.getStamp(), data.getOrigin()); break;
当rs把region状态设为‘opened‘时
case RS_ZK_REGION_OPENED: ...... // Handle OPENED by removing from transition and deleted zk node //内存状态改为open regionState.update(RegionState.State.OPEN, data.getStamp(), data.getOrigin()); this.executorService.submit( new OpenedRegionHandler(master, this, regionState.getRegion(), data.getOrigin(), expectedVersion)); break;
OpenedRegionHandler主要是删除之前创建的/hbase/unassigned下的region节点
public void process() { // Code to defend against case where we get SPLIT before region open // processing completes; temporary till we make SPLITs go via zk -- 0.92. RegionState regionState = this.assignmentManager.isRegionInTransition(regionInfo); boolean openedNodeDeleted = false; if (regionState != null && regionState.getState().equals(RegionState.State.OPEN)) { openedNodeDeleted = deleteOpenedNode(expectedVersion); if (!openedNodeDeleted) { LOG.error("The znode of region " + regionInfo.getRegionNameAsString() + " could not be deleted."); } } ...... }
节点删除后,又有zk通知,AssignmentManager的nodeDeleted方法
public void nodeDeleted(final String path) { if (path.startsWith(this.watcher.assignmentZNode)) { String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path); RegionState rs = this.regionsInTransition.get(regionName); if (rs != null) { HRegionInfo regionInfo = rs.getRegion(); if (rs.isSplit()) { LOG.debug("Ephemeral node deleted, regionserver crashed?, " + "clearing from RIT; rs=" + rs); regionOffline(rs.getRegion()); } else { LOG.debug("The znode of region " + regionInfo.getRegionNameAsString() + " has been deleted."); if (rs.isOpened()) { makeRegionOnline(rs, regionInfo); } } } } }
region上线,将region从transition列表中删除,并更新servers和regions列表
void regionOnline(HRegionInfo regionInfo, ServerName sn) { synchronized (this.regionsInTransition) { RegionState rs = this.regionsInTransition.remove(regionInfo.getEncodedName()); if (rs != null) { this.regionsInTransition.notifyAll(); } } synchronized (this.regions) { // Add check ServerName oldSn = this.regions.get(regionInfo); if (oldSn != null && serverManager.isServerOnline(oldSn)) { LOG.warn("Overwriting " + regionInfo.getEncodedName() + " on old:" + oldSn + " with new:" + sn); // remove region from old server Set<HRegionInfo> hris = servers.get(oldSn); if (hris != null) { hris.remove(regionInfo); } } if (isServerOnline(sn)) { this.regions.put(regionInfo, sn); addToServers(sn, regionInfo); this.regions.notifyAll(); } else { LOG.info("The server is not in online servers, ServerName=" + sn.getServerName() + ", region=" + regionInfo.getEncodedName()); } } // Remove plan if one. clearRegionPlan(regionInfo); // Add the server to serversInUpdatingTimer addToServersInUpdatingTimer(sn); }
小节
region assignment主要关键点
1.region load balance,默认是随即均匀分配
2.master在/hbase/unassigned下建立region节点,方便后续和rs交互
3.rs初始化region在HDFS上的文件目录,包括.regioninfo文件和family目录
4.rs open region之后,将状态设为’opened‘,master认为region assignment成功,删除节点,并将region保存到online列表
相关推荐
【HBASERegion数量增多问题描述及解决方案】 在HBase分布式数据库中,Region是表数据的基本存储单元,它将表的数据按照ROWKEY的范围进行分割。随着数据的增长,一个Region会分裂成两个,以此来确保数据的均衡分布。...
1、region 拆分机制 ...当region大小大于某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。 但是在生产线上这种切分策略却有相当大的弊端:切分策略对于大表和小表没有
hbase-region-inspector, HBase区域统计信息的可视化仪表板 hbase-region-inspectorHBase区域统计信息的可视化仪表板。 用法下载与HBase集群版本匹配的可执行二进制插件,添加execute权限,并使用以下命令行参数启动...
hbase-packet-inspector hbase-packet-inspector (HPI)是用于分析HBase RegionServers网络流量的命令行工具。 HPI读取tcpdump文件或捕获网络接口的实时数据包流,以提取有关客户端请求和响应的信息。 您可以对其...
在HBase中,Region自动切分是其可扩展性的重要机制,它确保了系统的水平扩展性和数据分布的均匀性。Region切分的关键在于如何高效、平衡地管理数据,避免单个Region过大导致性能下降,同时也要防止过度切分造成资源...
在HBase这个分布式列式数据库中,Region是其核心的数据存储和管理单元,它负责存储表中的行数据。随着数据量的增长,一个Region可能会变得过大,导致读写性能下降。这时,就需要对Region进行数据切割(Split),以...
在IT行业中,尤其是在大数据处理领域,HBase是一个广泛使用的分布式、高性能、列式存储的NoSQL数据库。HBase是建立在Hadoop文件系统(HDFS)之上,为处理大规模数据提供了一个高效的数据存储解决方案。而Spring Data...
1. Region服务器:存储HBase表的分区,负责处理表的读写请求。 2. Master节点:管理Region服务器,处理表和Region的分配,监控服务器健康状态,进行Region分裂和合并操作。 3. ZooKeeper:协调HBase集群,提供服务...
HBASE的主要原理解读:包括HBase 读写逻辑、HBase region拆分和合并
HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs.root.dir配置项,hdfs 最好写 ip; ②将core-site.xml、hdfs-site.xml添加到BOOT-INF/...
HBase中的Region分割(Region Split)是一个关键特性,它允许HBase在表数据量增大时,自动将一个Region分割成两个,从而保证每个Region的大小都保持在一个合理的范围。这是实现HBase高扩展性和高性能的关键机制之一...
分区,或者说Region,是HBase存储数据的基本单位。每个Region包含一个或多个表的行键范围,确保数据的分散存储,从而提高查询效率。当我们谈论"Hbase分区merge和split操作"时,我们指的是管理员或开发人员对Region...
HBase,全称为Hadoop Distributed File System上的基础结构(HBase on Hadoop Distributed File System),是一种...理解HBase的Region分裂和合并机制、RegionServer负载均衡以及故障处理策略也是深入使用HBase的关键。
7. **Region Server**:Region Server是HBase的主要工作节点,负责Region的存储和管理,包括读写操作。 8. **Zookeeper**:Zookeeper是HBase的重要组件,用于协调集群中的节点,如Region Server的位置信息。 ### ...
在HBase的分布式架构中,`org.apache.hadoop.hbase.client.HConnectionManager`负责管理客户端与HBase服务器之间的连接,而`org.apache.hadoop.hbase.regionserver.HRegionServer`是处理Region服务的主要组件,它...
HBase是Apache Hadoop生态系统中的一个分布式、版本化、列族式存储系统,设计用于处理大规模数据集。这个“hbase-2.4.17-bin”安装包提供了HBase的最新稳定版本2.4.17,适用于大数据处理和分析场景。下面将详细介绍...
- **自动分片**(Auto-Sharding):HBase通过自动将表分割成多个区域(Region),每个区域可以被分配到不同的节点上,从而实现水平扩展。 - **存储API**(Storage API):HBase提供了一套用于数据存储和访问的API...
2. **Region服务器通信**:开发包内部实现了与HBase Region服务器的通信协议,使得Java应用可以透明地与分布在全球的数据进行交互。 3. **数据模型**:HBase的数据模型基于行、列族、列和时间戳。Java-HBase开发包...