源码为0.98.1
HRegionServer中起线程MemStoreFlusher
private void initializeThreads() throws IOException { // Cache flushing thread. this.cacheFlusher = new MemStoreFlusher(conf, this); // Compaction thread this.compactSplitThread = new CompactSplitThread(this); .......
private void startServiceThreads() throws IOException { String n = Thread.currentThread().getName(); ...... this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); .....
/* * Run init. Sets up hlog and starts up all server threads. * * @param c Extra configuration. */ protected void handleReportForDutyResponse(final RegionServerStartupResponse c) throws IOException { .... startServiceThreads(); .....
public void run() { try { // Do pre-registration initializations; zookeeper, lease threads, etc. preRegistrationInitialization(); } catch (Throwable e) { abort("Fatal exception during initialization", e); } try { // Try and register with the Master; tell it we are here. Break if // server is stopped or the clusterup flag is down or hdfs went wacky. while (keepLooping()) { RegionServerStartupResponse w = reportForDuty(); if (w == null) { LOG.warn("reportForDuty failed; sleeping and then retrying."); this.sleeper.sleep(); } else { handleReportForDutyResponse(w);//启动所有hregionserver线程服务 break; } } ....
主要的类,方法:memStoreFlusher的flushRegion
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); if (fqe != null && emergencyFlush) { // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } } lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } ......
从flushQueue中取出FlushRegionEntry进行flush
获取读锁
- 调用HRegion进行flush,并返回是否需要compact
- 调用HRegion查看是否需要split
- if(split) spliting elif(compact) compacting
以下是具体操作:
--------------------------------------------------------------------------------------------------------------------
1.HRegion
protected boolean internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is abortted..."); } final long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { if(LOG.isDebugEnabled()) { LOG.debug("Empty memstore size for the current region "+this); } return false; } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + ", current region memstore size " + StringUtils.humanReadableInt(this.memstoreSize.get()) + ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); } // Stop updates while we snapshot the memstore of all stores. We only have // to do this for a moment. Its quick. The subsequent sequence id that // goes into the HLog after we've flushed all these snapshots also goes // into the info file that sits beside the flushed files. // We also set the memstore size to zero here before we allow updates // again so its value will represent the size of the updates received // during the flush MultiVersionConsistencyControl.WriteEntry w = null; // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic // rows then) status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores"); List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size()); long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w); // check if it is not closing. if (wal != null) { if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."); return false; } flushSeqId = this.sequenceId.incrementAndGet(); } else { // use the provided sequence Id as WAL is not being used for this flush. flushSeqId = myseqid; } for (Store s : stores.values()) { totalFlushableSize += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } // prepare flush (take a snapshot) for (StoreFlushContext flush : storeFlushCtxs) { //步骤1 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ flush.prepare(); } } finally { this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // sync unflushed WAL changes when deferred log sync is enabled // see HBASE-8208 for details if (wal != null && !shouldSyncLog()) { //步骤2 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ wal.sync(); } // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. mvcc.waitForRead(w); s = "Flushing stores of " + this; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. // Otherwise, the snapshot content while backed up in the hlog, it will not // be part of the current running servers state. boolean compactionRequested = false; try { // A. Flush memstore to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. The new flushed file is still in the // tmp directory. for (StoreFlushContext flush : storeFlushCtxs) { //步骤3 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ flush.flushCache(status); } // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). for (StoreFlushContext flush : storeFlushCtxs) { //步骤4 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ boolean needsCompaction = flush.commit(status); if (needsCompaction) { compactionRequested = true; } } storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. this.addAndGetGlobalMemstoreSize(-totalFlushableSize); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. // Currently, only a server restart will do this. // We used to only catch IOEs but its possible that we'd get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. if (wal != null) { wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); status.abort("Flush failed: " + StringUtils.stringifyException(t)); throw dse; } // If we get to here, the HStores have been written. if (wal != null) { wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } // Record latest flush time this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Update the last flushed sequence id for region completeSequenceId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). synchronized (this) { notifyAll(); // FindBugs NN_NAKED_NOTIFY } long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": ""); LOG.info(msg); status.setStatus(msg); this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize)); return compactionRequested; }
调用HRegion的internalFlushcache方法
1.HRegion 1661 HStore 1941 prepare (获取写锁) 用memStore类复制kvset生成snapshot作为本次mem flush的内存
(每次flush会触发region内的所有store的flush,所以flush的最小单位是region,不是store,这也是不太建议多个cf理由的一个原因)
2.HRegion 1674 调用wal 等待wal完成
3.HRegion 1700 HStore flushCache生成tmpfile(一个HStore一个tmpfile,虽然用的tmpfiles是个List)
在
4.HRegion 1706 HStore将新生成的tmpfiles封装为HStorefile,
HStore调用updateStorefiles方法,获得写锁添加到StoreFileManager的List中,提供服务,清空snapshot
HStore 951 needsCompaction方法, 调用RatioBasedCompactionPolicy.needsCompaction方法,判断storm是否需要compact
(判断方法hfile数量大于hbase.hstore.compaction.min 和 hbase.hstore.compactionThreshold的最大值数(默认值为3))
--------------------------------------------------------------------------------------------------------------------
2. hregion查看是否split,实现类为split策略类:IncreasingToUpperBoundRegionSplitPolicy
@Override protected boolean shouldSplit() { if (region.shouldForceSplit()) return true; boolean foundABigStore = false; // Get count of regions that have the same common table as this.region int tableRegionsCount = getCountOfCommonTableRegions(); // Get size to check long sizeToCheck = getSizeToCheck(tableRegionsCount); for (Store store : region.getStores().values()) { // If any of the stores is unable to split (eg they contain reference files) // then don't split if ((!store.canSplit())) { return false; } // Mark if any store is big enough long size = store.getSize(); if (size > sizeToCheck) { LOG.debug("ShouldSplit because " + store.getColumnFamilyName() + " size=" + size + ", sizeToCheck=" + sizeToCheck + ", regionsWithCommonTable=" + tableRegionsCount); foundABigStore = true; } } return foundABigStore; }
调用IncreasingToUpperBoundRegionSplitPolicy 65 shouldSplit方法,判断,这个region是否需要split
(又是以一个region查看是否需要split的,所以多个cf真的不好)
((init)initialSize = hbase.increasing.policy.initial.size(预先设置初始值大小) 或hbase.hregion.memstore.flush.size (memflush大小))
获取this.region所在表的所有region数 getCountOfCommonTableRegions 为regioncount
当regioncount在0到100之间,取配置hbase.hregion.max.filesize(默认10G)和initialSize*(regioncount^3)的最小值 否则取配置hbase.hregion.max.filesize(默认10G)
如,只有一个region,128*1^3=128M
128*2^3=1024M
128*3^3=3456M
128*4^3=8192M
128*5^3=16000M(15G) => 10G 当有5个region就可以用配置了
--------------------------------------------------------------------------------------------------------------------
3.if(split) spliting elif(compact) compacting
http://blackproof.iteye.com/blog/2037159
之前做过笔记,自己都快忘了
又写了一份region split的
public PairOfSameType<HRegion> stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't // have zookeeper so don't do zk stuff if server or zookeeper is null if (server != null && server.getZooKeeper() != null) { try { //步骤1@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b); } catch (KeeperException e) { throw new IOException("Failed creating PENDING_SPLIT znode on " + this.parent.getRegionNameAsString(), e); } } this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); if (server != null && server.getZooKeeper() != null) { // After creating the split node, wait for master to transition it // from PENDING_SPLIT to SPLITTING so that we can move on. We want master // knows about it and won't transition any region which is splitting. //步骤2@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ znodeVersion = getZKNode(server, services); } //步骤3@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ this.parent.getRegionFileSystem().createSplitsDir(); this.journal.add(JournalEntry.CREATE_SPLIT_DIR); Map<byte[], List<StoreFile>> hstoreFilesToSplit = null; Exception exceptionToThrow = null; try{ //步骤4@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ hstoreFilesToSplit = this.parent.close(false); } catch (Exception e) { exceptionToThrow = e; } if (exceptionToThrow == null && hstoreFilesToSplit == null) { // The region was closed by a concurrent thread. We can't continue // with the split, instead we must just abandon the split. If we // reopen or split this could cause problems because the region has // probably already been moved to a different server, or is in the // process of moving to a different server. exceptionToThrow = closedByOtherException; } if (exceptionToThrow != closedByOtherException) { this.journal.add(JournalEntry.CLOSED_PARENT_REGION); } if (exceptionToThrow != null) { if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; throw new IOException(exceptionToThrow); } if (!testing) { //步骤5@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ services.removeFromOnlineRegions(this.parent, null); } this.journal.add(JournalEntry.OFFLINED_PARENT); // TODO: If splitStoreFiles were multithreaded would we complete steps in // less elapsed time? St.Ack 20100920 // // splitStoreFiles creates daughter region dirs under the parent splits dir // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will // clean this up. //步骤6@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ splitStoreFiles(hstoreFilesToSplit); // Log to the journal that we are creating region A, the first daughter // region. We could fail halfway through. If we do, we could have left // stuff in fs that needs cleanup -- a storefile or two. Thats why we // add entry to journal BEFORE rather than AFTER the change. //步骤7@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ this.journal.add(JournalEntry.STARTED_REGION_A_CREATION); HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a); // Ditto this.journal.add(JournalEntry.STARTED_REGION_B_CREATION); HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b); return new PairOfSameType<HRegion>(a, b); }
1.RegionSplitPolicy.getSplitPoint()获得region split的split point ,最大store的中间点midpoint最为split point
2.SplitRequest.run()
实例化SplitTransaction
st.prepare():split前准备:region是否关闭,所有hfile是否被引用
st.execute:执行split操作
1.createDaughters 创建两个region,获得parent region的写锁
1在zk上创建一个临时的node splitting point,
2等待master直到这个region转为splitting状态
3之后建立splitting的文件夹,
4等待region的flush和compact都完成后,关闭这个region
5从HRegionServer上移除,加入到下线region中
6进行regionsplit操作,创建线程池,用StoreFileSplitter类将region下的所有Hfile(StoreFile)进行split,
(split row在hfile中的不管,其他的都进行引用,把引用文件分别写到region下边)
7.生成左右两个子region,删除meta上parent,根据引用文件生成子region的regioninfo,写到hdfs上
2.stepsAfterPONR 调用DaughterOpener类run打开两个子region,调用initilize
a)向hdfs上写入.regionInfo文件以便meta挂掉以便恢复
b)初始化其下的HStore,主要是LoadStoreFiles函数:
对于该store函数会构造storefile对象,从hdfs上获取路径和文件,每个文件一个
storefile对象,对每个storefile对象会读取文件上的内容创建一个
HalfStoreFileReader读对象来操作该region的父region上的相应的文件,及该
region上目前存储的是引用文件,其指向的是其父region上的相应的文件,对该
region的所有读或写都将关联到父region上
将子Region添加到rs的online region列表上,并添加到meta表上
相关推荐
HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解
HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...
源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...
该项目为基于Java开发的分布式NoSQL数据库HBase的设计源码,包含5289个文件,涵盖各类编程语言和文件类型,其中Java源文件4465个,Ruby脚本221个,XML配置112个,Protobuf定义66个,以及少量Shell、Python、...
HBase的源码分析有助于理解其内部工作原理。例如,`HRegionServer`是数据服务的主要组件,负责Region的管理和数据操作;`HMaster`负责Region的分配和负载均衡;`HStore`管理Column Family,包含一系列的`HStoreFile...
HBase 1.2.0是该数据库的一个稳定版本,包含了众多优化和改进,对于想要深入理解HBase工作原理或者进行大数据分析的学习者来说,研究其源码是非常有价值的。 一、HBase架构与核心概念 1. 表与Region:HBase中的...
源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...
本使用kafka,spark,hbase开发日志分析系统。 ![architecture](/docs/images/architecture.png "architecture") ### 软件模块 * Kafka:作为日志事件的消息系统,具有分布式,可分区,可冗余的消息服务功能。...
通过分析和实践《HBase权威指南》的源码,读者不仅可以深化理论知识,还能掌握实际操作技巧,为解决实际项目中的问题提供有力支持。对于想深入理解HBase工作原理和优化技巧的开发者来说,这份源码是一份宝贵的资源。
HBase源码(hbase-2.4.9-src.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File ...
### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...
HBase 0.94.4的源码分析有助于我们深入了解其内部机制,从而更好地进行系统设计和优化。无论是对于开发者还是管理员,掌握HBase的核心原理都将极大地提升在大数据领域的实践能力。通过不断学习和实践,我们可以更好...
### HBase性能深度分析 HBase,作为BigTable的一个开源实现,因其卓越的分布式数据库特性在大数据处理领域占据了重要地位。然而,随着HBase在各行业的广泛应用,用户对其性能表现的关注日益增强,尤其是实时数据...
### HBase源码解析与开发实战 #### 一、HBase简介 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了...
HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。
该系统是一款基于Java语言的HBase数据库设计源码,共计26个文件,其中包含18个Java源文件、5个XML配置文件、2个Git忽略文件和1个属性文件。
通过分析"HBaseTest"源码,我们可以更直观地理解HBase的工作原理和使用方式,为实际项目中的数据存储和处理提供有力的支持。在实际应用中,还需要根据具体需求进行性能调优,以充分利用HBase的优势,解决大数据场景...
本文将对HBase的源码进行深度分析,帮助读者理解其内部工作原理和设计思路。 首先,我们来看看HBase的核心架构。HBase基于Google的Bigtable模型,采用分布式存储方式,由RegionServer负责数据存储,Master服务器...