HLog线程的启动入口:
HRegionServer启动线程
private void startServiceThreads() throws IOException {..... Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler);//logRoller守护进程,每一个小时生成一个hlog this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start();//启动split work类 .....} HRegionServer实例化FSHlog protected void handleReportForDutyResponse(final RegionServerStartupResponse c) throws IOException {..... this.hlog = setupWALAndReplication();//RS的hlog .....}
FSHlog启动hlog的各个线程
asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter"); asyncWriter.start();//从队列写logedit到stream的进程 int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5); asyncSyncers = new AsyncSyncer[syncerNums]; for (int i = 0; i < asyncSyncers.length; ++i) { asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i); asyncSyncers[i].start();//将stream里的logedit写到hdfs上,并且同步image } asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier"); asyncNotifier.start();//专门的notify同步线程,(无需同步线程去notify其他的)?? coprocessorHost = new WALCoprocessorHost(this, conf); this.metrics = new MetricsWAL();
HLog的wal过程:
HBase put源码分析中 http://blackproof.iteye.com/blog/2197710
在第7步中,将hlog写入logedit队列中
FSHLog的AsyncWriter线程将队列写入hdfs中
1.等待新write进入队列
2.等待队列同步,置换,清空
3.logedit写入hdfs(outputstream中)
4.更新最新的write num
public void run() { try { while (!this.isInterrupted()) { // 1. wait until there is new writes in local buffer synchronized (this.writeLock) {//写同步 while (this.pendingTxid <= this.lastWrittenTxid) {//等待新writenum,队列中的writenum大于当前writenum this.writeLock.wait();//否则等待新write edit } } // 2. get all buffered writes and update 'real' pendingTxid // since maybe newer writes enter buffer as AsyncWriter wakes // up and holds the lock // NOTE! can't hold 'updateLock' here since rollWriter will pend // on 'sync()' with 'updateLock', but 'sync()' will wait for // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock // can leads to pendWrites more than pendingTxid, but not problem List<Entry> pendWrites = null; synchronized (pendingWritesLock) {//等待队列同步,置换,清空 this.txidToWrite = unflushedEntries.get(); pendWrites = pendingWrites; pendingWrites = new LinkedList<Entry>(); } // 3. write all buffered writes to HDFS(append, without sync) try { for (Entry e : pendWrites) { writer.append(e);//hlog edit写入hdfs中 } } catch(IOException e) { LOG.error("Error while AsyncWriter write, request close of hlog ", e); requestLogRoll(); asyncIOE = e; failedTxid.set(this.txidToWrite); } // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync' this.lastWrittenTxid = this.txidToWrite;//更新最新的write num boolean hasIdleSyncer = false; for (int i = 0; i < asyncSyncers.length; ++i) { if (!asyncSyncers[i].isSyncing()) { hasIdleSyncer = true; asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid); break; } } if (!hasIdleSyncer) { int idx = (int)this.lastWrittenTxid % asyncSyncers.length; asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid); } } } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for " + "newer writes added to local buffer"); } catch (Exception e) { LOG.error("UNEXPECTED", e); } finally { LOG.info(getName() + " exiting"); } } }
FSHlog的AsyncSyncer线程,会将outputstream的数据flush到磁盘,并同步hdfs的image
public void run() { try { while (!this.isInterrupted()) { // 1. wait until AsyncWriter has written data to HDFS and // called setWrittenTxid to wake up us synchronized (this.syncLock) { while (this.writtenTxid <= this.lastSyncedTxid) { this.syncLock.wait(); } this.txidToSync = this.writtenTxid; } // if this syncer's writes have been synced by other syncer: // 1. just set lastSyncedTxid // 2. don't do real sync, don't notify AsyncNotifier, don't logroll check // regardless of whether the writer is null or not if (this.txidToSync <= syncedTillHere.get()) { this.lastSyncedTxid = this.txidToSync; continue; } // 2. do 'sync' to HDFS to provide durability long now = EnvironmentEdgeManager.currentTimeMillis(); try { if (writer == null) { // the only possible case where writer == null is as below: // 1. t1: AsyncWriter append writes to hdfs, // envokes AsyncSyncer 1 with writtenTxid==100 // 2. t2: AsyncWriter append writes to hdfs, // envokes AsyncSyncer 2 with writtenTxid==200 // 3. t3: rollWriter starts, it grabs the updateLock which // prevents further writes entering pendingWrites and // wait for all items(200) in pendingWrites to append/sync // to hdfs // 4. t4: AsyncSyncer 2 finishes, now syncedTillHere==200 // 5. t5: rollWriter close writer, set writer=null... // 6. t6: AsyncSyncer 1 starts to use writer to do sync... before // rollWriter set writer to the newly created Writer // // Now writer == null and txidToSync > syncedTillHere here: // we need fail all the writes with txid <= txidToSync to avoid // 'data loss' where user get successful write response but can't // read the writes! LOG.fatal("should never happen: has unsynced writes but writer is null!"); asyncIOE = new IOException("has unsynced writes but writer is null!"); failedTxid.set(this.txidToSync); } else { this.isSyncing = true; writer.sync();//同步方法//hlog的地方需要增加retry,延长文件租期过期时间,hlog是否重写 this.isSyncing = false; } postSync(); } catch (IOException e) { LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e); requestLogRoll(); asyncIOE = e; failedTxid.set(this.txidToSync); this.isSyncing = false; } metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put' // handler threads on 'sync()' this.lastSyncedTxid = this.txidToSync; asyncNotifier.setFlushedTxid(this.lastSyncedTxid); // 4. check and do logRoll if needed boolean logRollNeeded = false; if (rollWriterLock.tryLock()) { try { logRollNeeded = checkLowReplication(); } finally { rollWriterLock.unlock(); } try { if (logRollNeeded || writer != null && writer.getLength() > logrollsize) { requestLogRoll(); } } catch (IOException e) { LOG.warn("writer.getLength() failed,this failure won't block here"); } } } } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for " + "notification from AsyncWriter thread"); } catch (Exception e) { LOG.error("UNEXPECTED", e); } finally { LOG.info(getName() + " exiting"); } } }
LogRoller定时调用FSHlog的rollwrite方法,生成hlog,将stream流中的数据输出到hdfs上
@Override public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { // Return if nothing to flush. if (!force && this.writer != null && this.numEntries.get() <= 0) { return null; } byte [][] regionsToFlush = null; if (closed) { LOG.debug("HLog closed. Skipping rolling of writer"); return null; } try { if (!closeBarrier.beginOp()) { LOG.debug("HLog closing. Skipping rolling of writer"); return regionsToFlush; } // Do all the preparation outside of the updateLock to block // as less as possible the incoming writes long currentFilenum = this.filenum; Path oldPath = null; if (currentFilenum > 0) { //computeFilename will take care of meta hlog filename oldPath = computeFilename(currentFilenum); } this.filenum = System.currentTimeMillis(); Path newPath = computeFilename(); while (fs.exists(newPath)) { this.filenum++; newPath = computeFilename(); } // Tell our listeners that a new log is about to be created if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.preLogRoll(oldPath, newPath); } } FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); // Can we get at the dfsclient outputstream? FSDataOutputStream nextHdfsOut = null; if (nextWriter instanceof ProtobufLogWriter) { nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); // perform the costly sync before we get the lock to roll writers. try { nextWriter.sync();//protobuf flush到hdfs上,并且seque write写hdfs的同步,让block和image同步 } catch (IOException e) { // optimization failed, no need to abort here. LOG.warn("pre-sync failed", e); } } Path oldFile = null; int oldNumEntries = 0; synchronized (updateLock) { // Clean up current writer. oldNumEntries = this.numEntries.get(); oldFile = cleanupCurrentWriter(currentFilenum); this.writer = nextWriter; this.hdfs_out = nextHdfsOut; this.numEntries.set(0); if (oldFile != null) { this.hlogSequenceNums.put(oldFile, this.latestSequenceNums); this.latestSequenceNums = new HashMap<byte[], Long>(); } } if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath)); else { long oldFileLen = this.fs.getFileStatus(oldFile).getLen(); this.totalLogSize.addAndGet(oldFileLen); LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries + ", filesize=" + StringUtils.humanReadableInt(oldFileLen) + "; new WAL " + FSUtils.getPath(newPath)); } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.postLogRoll(oldPath, newPath); } } // Can we delete any of the old log files? if (getNumRolledLogFiles() > 0) { cleanOldLogs(); regionsToFlush = findRegionsToForceFlush(); } } finally { closeBarrier.endOp(); } return regionsToFlush; } finally { rollWriterLock.unlock(); } }
相关推荐
《HBase实战源码》是针对Apache HBase这一分布式、高性能、基于列族的NoSQL数据库的深度解析书籍。源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供...
源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...
这个源码包名为"hbase-book-master",意味着它是该书的主代码仓库,包含完整的示例和教程,非常适合开发者深入研究。 HBase是一种构建在Hadoop文件系统(HDFS)之上的分布式、版本化、列族式存储系统,设计用于处理...
### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...
通过研究这些源码,读者可以了解到HBase的内部工作原理,如MemStore、HLog、BlockCache等关键组件的作用,以及它们如何协同工作以实现高效的数据存储和检索。此外,还能学习到如何根据业务需求来设计合理的表结构,...
### HBase源码分析——关键知识点详解 #### 一、HBase性能测试总结与环境配置 **测试环境:** - **硬件配置:** - 1台客户端机器 - 5台RegionServer服务器 - 1台Master服务器 - 3台Zookeeper服务器 - **软件...
对于源码级别的理解,可以查看HBase的源代码,了解Replication的具体实现细节。同时,HBase提供了命令行工具以及Admin API来管理和监控复制状态。 总的来说,HBase的Replication功能是保证数据可靠性和一致性的...
在这个“hbase-2.1.7-bin.tar.gz”压缩包中,包含的是HBase 2.1.7版本的源码及其相关文件,适用于大数据处理环境,经过验证,确保其可用性。本文将详细介绍HBase的基本概念、架构、主要特性以及如何在实际应用中部署...
- **写操作**:数据写入时先写入内存,达到一定阈值后写入 HLog(持久化日志),最后批量写入 HDFS,保证数据一致性。 ### 5. 数据索引 HBase 的索引主要依赖于行键的排序特性,可以通过自定义的布隆过滤器或者二...
在压缩包“smoketest-hbase-master”中,我们可以推测这是包含HBase冒烟测试源码的主分支。"master"通常指的是开发中的主要分支,意味着这些测试是最新的,反映了当前HBase的主要功能和行为。 HBase的冒烟测试可能...