`

HBase memflush源码分析

阅读更多

源码为0.98.1 

HRegionServer中起线程MemStoreFlusher

private void initializeThreads() throws IOException {
    // Cache flushing thread.
    this.cacheFlusher = new MemStoreFlusher(conf, this);

    // Compaction thread
    this.compactSplitThread = new CompactSplitThread(this);

   .......

 

  private void startServiceThreads() throws IOException {
    String n = Thread.currentThread().getName();
......
    this.cacheFlusher.start(uncaughtExceptionHandler);

    Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
      ".compactionChecker", uncaughtExceptionHandler);

.....

 

 /*
   * Run init. Sets up hlog and starts up all server threads.
   *
   * @param c Extra configuration.
   */
  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
  throws IOException {
....

      startServiceThreads();
.....

 

  public void run() {
    try {
      // Do pre-registration initializations; zookeeper, lease threads, etc.
      preRegistrationInitialization();
    } catch (Throwable e) {
      abort("Fatal exception during initialization", e);
    }

    try {
      // Try and register with the Master; tell it we are here.  Break if
      // server is stopped or the clusterup flag is down or hdfs went wacky.
      while (keepLooping()) {
        RegionServerStartupResponse w = reportForDuty();
        if (w == null) {
          LOG.warn("reportForDuty failed; sleeping and then retrying.");
          this.sleeper.sleep();
        } else {
          handleReportForDutyResponse(w);//启动所有hregionserver线程服务
          break;
        }
      }
....

 

 

主要的类,方法:memStoreFlusher的flushRegion

 private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
     }
    }
    lock.readLock().lock();
    try {
      boolean shouldCompact = region.flushcache();
      // We just want to check the size
      boolean shouldSplit = region.checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
......

  从flushQueue中取出FlushRegionEntry进行flush

 获取读锁

  1.  调用HRegion进行flush,并返回是否需要compact
  2.  调用HRegion查看是否需要split
  3.  if(split) spliting  elif(compact) compacting

以下是具体操作:

--------------------------------------------------------------------------------------------------------------------

 

1.HRegion

 protected boolean internalFlushcache(
      final HLog wal, final long myseqid, MonitoredTask status)
  throws IOException {
    if (this.rsServices != null && this.rsServices.isAborted()) {
      // Don't flush when server aborting, it's unsafe
      throw new IOException("Aborting flush because server is abortted...");
    }
    final long startTime = EnvironmentEdgeManager.currentTimeMillis();
    // Clear flush flag.
    // If nothing to flush, return and avoid logging start/stop flush.
    if (this.memstoreSize.get() <= 0) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Empty memstore size for the current region "+this);
      }
      return false;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Started memstore flush for " + this +
        ", current region memstore size " +
        StringUtils.humanReadableInt(this.memstoreSize.get()) +
        ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
    }

    // Stop updates while we snapshot the memstore of all stores. We only have
    // to do this for a moment.  Its quick.  The subsequent sequence id that
    // goes into the HLog after we've flushed all these snapshots also goes
    // into the info file that sits beside the flushed files.
    // We also set the memstore size to zero here before we allow updates
    // again so its value will represent the size of the updates received
    // during the flush
    MultiVersionConsistencyControl.WriteEntry w = null;

    // We have to take a write lock during snapshot, or else a write could
    // end up in both snapshot and memstore (makes it difficult to do atomic
    // rows then)
    status.setStatus("Obtaining lock to block concurrent updates");
    // block waiting for the lock for internal flush
    this.updatesLock.writeLock().lock();
    long totalFlushableSize = 0;
    status.setStatus("Preparing to flush by snapshotting stores");
    List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
    long flushSeqId = -1L;
    try {
      // Record the mvcc for all transactions in progress.
      w = mvcc.beginMemstoreInsert();
      mvcc.advanceMemstore(w);
      // check if it is not closing.
      if (wal != null) {
        if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
          status.setStatus("Flush will not be started for ["
              + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.");
          return false;
        }
        flushSeqId = this.sequenceId.incrementAndGet();
      } else {
        // use the provided sequence Id as WAL is not being used for this flush.
        flushSeqId = myseqid;
      }

      for (Store s : stores.values()) {
        totalFlushableSize += s.getFlushableSize();
        storeFlushCtxs.add(s.createFlushContext(flushSeqId));
      }

      // prepare flush (take a snapshot)
      for (StoreFlushContext flush : storeFlushCtxs) {
//步骤1   @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        flush.prepare(); 
      }
    } finally {
      this.updatesLock.writeLock().unlock();
    }
    String s = "Finished memstore snapshotting " + this +
      ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
    status.setStatus(s);
    if (LOG.isTraceEnabled()) LOG.trace(s);

    // sync unflushed WAL changes when deferred log sync is enabled
    // see HBASE-8208 for details
    if (wal != null && !shouldSyncLog()) {
//步骤2  @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
      wal.sync();
    }

    // wait for all in-progress transactions to commit to HLog before
    // we can start the flush. This prevents
    // uncommitted transactions from being written into HFiles.
    // We have to block before we start the flush, otherwise keys that
    // were removed via a rollbackMemstore could be written to Hfiles.
    mvcc.waitForRead(w);

    s = "Flushing stores of " + this;
    status.setStatus(s);
    if (LOG.isTraceEnabled()) LOG.trace(s);

    // Any failure from here on out will be catastrophic requiring server
    // restart so hlog content can be replayed and put back into the memstore.
    // Otherwise, the snapshot content while backed up in the hlog, it will not
    // be part of the current running servers state.
    boolean compactionRequested = false;
    try {
      // A.  Flush memstore to all the HStores.
      // Keep running vector of all store files that includes both old and the
      // just-made new flush store file. The new flushed file is still in the
      // tmp directory.

      for (StoreFlushContext flush : storeFlushCtxs) {
//步骤3   @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        flush.flushCache(status);
      }

      // Switch snapshot (in memstore) -> new hfile (thus causing
      // all the store scanners to reset/reseek).
      for (StoreFlushContext flush : storeFlushCtxs) {
//步骤4   @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        boolean needsCompaction = flush.commit(status);
        if (needsCompaction) {
          compactionRequested = true;
        }
      }
      storeFlushCtxs.clear();

      // Set down the memstore size by amount of flush.
      this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
    } catch (Throwable t) {
      // An exception here means that the snapshot was not persisted.
      // The hlog needs to be replayed so its content is restored to memstore.
      // Currently, only a server restart will do this.
      // We used to only catch IOEs but its possible that we'd get other
      // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
      // all and sundry.
      if (wal != null) {
        wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
      }
      DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
          Bytes.toStringBinary(getRegionName()));
      dse.initCause(t);
      status.abort("Flush failed: " + StringUtils.stringifyException(t));
      throw dse;
    }

    // If we get to here, the HStores have been written.
    if (wal != null) {
      wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
    }

    // Record latest flush time
    this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();

    // Update the last flushed sequence id for region
    completeSequenceId = flushSeqId;

    // C. Finally notify anyone waiting on memstore to clear:
    // e.g. checkResources().
    synchronized (this) {
      notifyAll(); // FindBugs NN_NAKED_NOTIFY
    }

    long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
    long memstoresize = this.memstoreSize.get();
    String msg = "Finished memstore flush of ~" +
      StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
      ", currentsize=" +
      StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
      " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
      ", compaction requested=" + compactionRequested +
      ((wal == null)? "; wal=null": "");
    LOG.info(msg);
    status.setStatus(msg);
    this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));

    return compactionRequested;
  }

 调用HRegion的internalFlushcache方法

 1.HRegion 1661 HStore 1941 prepare (获取写锁) 用memStore类复制kvset生成snapshot作为本次mem flush的内存

 (每次flush会触发region内的所有store的flush,所以flush的最小单位是region,不是store,这也是不太建议多个cf理由的一个原因)

 2.HRegion 1674 调用wal  等待wal完成

 3.HRegion 1700 HStore flushCache生成tmpfile(一个HStore一个tmpfile,虽然用的tmpfiles是个List)

   在

 4.HRegion 1706 HStore将新生成的tmpfiles封装为HStorefile,

 HStore调用updateStorefiles方法,获得写锁添加到StoreFileManager的List中,提供服务,清空snapshot

    HStore 951 needsCompaction方法, 调用RatioBasedCompactionPolicy.needsCompaction方法,判断storm是否需要compact

    (判断方法hfile数量大于hbase.hstore.compaction.min 和 hbase.hstore.compactionThreshold的最大值数(默认值为3)) 

 

--------------------------------------------------------------------------------------------------------------------

 

2. hregion查看是否split,实现类为split策略类:IncreasingToUpperBoundRegionSplitPolicy

  @Override
  protected boolean shouldSplit() {
    if (region.shouldForceSplit()) return true;
    boolean foundABigStore = false;
    // Get count of regions that have the same common table as this.region
    int tableRegionsCount = getCountOfCommonTableRegions();
    // Get size to check
    long sizeToCheck = getSizeToCheck(tableRegionsCount);

    for (Store store : region.getStores().values()) {
      // If any of the stores is unable to split (eg they contain reference files)
      // then don't split
      if ((!store.canSplit())) {
        return false;
      }

      // Mark if any store is big enough
      long size = store.getSize();
      if (size > sizeToCheck) {
        LOG.debug("ShouldSplit because " + store.getColumnFamilyName() +
          " size=" + size + ", sizeToCheck=" + sizeToCheck +
          ", regionsWithCommonTable=" + tableRegionsCount);
        foundABigStore = true;
      }
    }

    return foundABigStore;
  }

 调用IncreasingToUpperBoundRegionSplitPolicy 65 shouldSplit方法,判断,这个region是否需要split

(又是以一个region查看是否需要split的,所以多个cf真的不好)

((init)initialSize = hbase.increasing.policy.initial.size(预先设置初始值大小) 或hbase.hregion.memstore.flush.size (memflush大小))

获取this.region所在表的所有region数 getCountOfCommonTableRegions 为regioncount

当regioncount在0到100之间,取配置hbase.hregion.max.filesize(默认10G)和initialSize*(regioncount^3)的最小值  否则取配置hbase.hregion.max.filesize(默认10G)

如,只有一个region,128*1^3=128M

128*2^3=1024M

128*3^3=3456M

128*4^3=8192M

128*5^3=16000M(15G) => 10G 当有5个region就可以用配置了

 

 

--------------------------------------------------------------------------------------------------------------------

 3.if(split) spliting  elif(compact) compacting

 

 http://blackproof.iteye.com/blog/2037159

 

之前做过笔记,自己都快忘了

又写了一份region split的

 

生成两个子region的代码:SplitTransaction.stepsBeforePONR
 public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
      final RegionServerServices services, boolean testing) throws IOException {
    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
    // have zookeeper so don't do zk stuff if server or zookeeper is null
    if (server != null && server.getZooKeeper() != null) {
      try {
    	    //步骤1@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        createNodeSplitting(server.getZooKeeper(),
          parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
      } catch (KeeperException e) {
        throw new IOException("Failed creating PENDING_SPLIT znode on " +
          this.parent.getRegionNameAsString(), e);
      }
    }
    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
    if (server != null && server.getZooKeeper() != null) {
      // After creating the split node, wait for master to transition it
      // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
      // knows about it and won't transition any region which is splitting.
	    //步骤2@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
      znodeVersion = getZKNode(server, services);
    }

    //步骤3@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    this.parent.getRegionFileSystem().createSplitsDir();
    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);

    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
    Exception exceptionToThrow = null;
    try{
	    //步骤4@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
      hstoreFilesToSplit = this.parent.close(false);
    } catch (Exception e) {
      exceptionToThrow = e;
    }
    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
      // The region was closed by a concurrent thread.  We can't continue
      // with the split, instead we must just abandon the split.  If we
      // reopen or split this could cause problems because the region has
      // probably already been moved to a different server, or is in the
      // process of moving to a different server.
      exceptionToThrow = closedByOtherException;
    }
    if (exceptionToThrow != closedByOtherException) {
      this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
    }
    if (exceptionToThrow != null) {
      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
      throw new IOException(exceptionToThrow);
    }
    if (!testing) {
	    //步骤5@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
      services.removeFromOnlineRegions(this.parent, null);
    }
    this.journal.add(JournalEntry.OFFLINED_PARENT);

    // TODO: If splitStoreFiles were multithreaded would we complete steps in
    // less elapsed time?  St.Ack 20100920
    //
    // splitStoreFiles creates daughter region dirs under the parent splits dir
    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
    // clean this up.
    //步骤6@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    splitStoreFiles(hstoreFilesToSplit);

    // Log to the journal that we are creating region A, the first daughter
    // region.  We could fail halfway through.  If we do, we could have left
    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
    // add entry to journal BEFORE rather than AFTER the change.
    //步骤7@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);

    // Ditto
    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
    return new PairOfSameType<HRegion>(a, b);
  }

  1.RegionSplitPolicy.getSplitPoint()获得region split的split point  ,最大store的中间点midpoint最为split point

  2.SplitRequest.run()

            实例化SplitTransaction

            st.prepare():split前准备:region是否关闭,所有hfile是否被引用

            st.execute:执行split操作

 

 1.createDaughters 创建两个region,获得parent region的写锁

      1在zk上创建一个临时的node splitting point,

      2等待master直到这个region转为splitting状态

      3之后建立splitting的文件夹,

      4等待region的flush和compact都完成后,关闭这个region

      5从HRegionServer上移除,加入到下线region中

      6进行regionsplit操作,创建线程池,用StoreFileSplitter类将region下的所有Hfile(StoreFile)进行split,

      (split row在hfile中的不管,其他的都进行引用,把引用文件分别写到region下边)

      7.生成左右两个子region,删除meta上parent,根据引用文件生成子region的regioninfo,写到hdfs上

 2.stepsAfterPONR 调用DaughterOpener类run打开两个子region,调用initilize

     a)向hdfs上写入.regionInfo文件以便meta挂掉以便恢复 

  b)初始化其下的HStore,主要是LoadStoreFiles函数: 

      对于该store函数会构造storefile对象,从hdfs上获取路径和文件,每个文件一个 

     storefile对象,对每个storefile对象会读取文件上的内容创建一个 

      HalfStoreFileReader读对象来操作该region的父region上的相应的文件,及该 

      region上目前存储的是引用文件,其指向的是其父region上的相应的文件,对该 

      region的所有读或写都将关联到父region上 

    将子Region添加到rs的online region列表上,并添加到meta表上 

 

 

 

分享到:
评论

相关推荐

    HBase源码分析与开发实战

    HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解

    HBase源码分析

    HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...

    HBase实战源码

    源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...

    基于Java开发的分布式NoSQL数据库HBase设计源码分析

    该项目为基于Java开发的分布式NoSQL数据库HBase的设计源码,包含5289个文件,涵盖各类编程语言和文件类型,其中Java源文件4465个,Ruby脚本221个,XML配置112个,Protobuf定义66个,以及少量Shell、Python、...

    hbase源码包和测试用例

    HBase的源码分析有助于理解其内部工作原理。例如,`HRegionServer`是数据服务的主要组件,负责Region的管理和数据操作;`HMaster`负责Region的分配和负载均衡;`HStore`管理Column Family,包含一系列的`HStoreFile...

    hbase 1.2.0源码

    HBase 1.2.0是该数据库的一个稳定版本,包含了众多优化和改进,对于想要深入理解HBase工作原理或者进行大数据分析的学习者来说,研究其源码是非常有价值的。 一、HBase架构与核心概念 1. 表与Region:HBase中的...

    hbase-0.98.1源码包

    源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...

    使用kafka,spark,hbase开发日志分析系统

    本使用kafka,spark,hbase开发日志分析系统。 ![architecture](/docs/images/architecture.png "architecture") ### 软件模块 * Kafka:作为日志事件的消息系统,具有分布式,可分区,可冗余的消息服务功能。...

    hbase权威指南源码

    通过分析和实践《HBase权威指南》的源码,读者不仅可以深化理论知识,还能掌握实际操作技巧,为解决实际项目中的问题提供有力支持。对于想深入理解HBase工作原理和优化技巧的开发者来说,这份源码是一份宝贵的资源。

    HBase源码(hbase-2.4.9-src.tar.gz)

    HBase源码(hbase-2.4.9-src.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File ...

    hbase源码分析

    ### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...

    hbase 源码包

    HBase 0.94.4的源码分析有助于我们深入了解其内部机制,从而更好地进行系统设计和优化。无论是对于开发者还是管理员,掌握HBase的核心原理都将极大地提升在大数据领域的实践能力。通过不断学习和实践,我们可以更好...

    HBase性能深度分析

    ### HBase性能深度分析 HBase,作为BigTable的一个开源实现,因其卓越的分布式数据库特性在大数据处理领域占据了重要地位。然而,随着HBase在各行业的广泛应用,用户对其性能表现的关注日益增强,尤其是实时数据...

    【No0057】HBase源码解析与开发实战.txt

    ### HBase源码解析与开发实战 #### 一、HBase简介 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了...

    Hbase1.3.1源码

    HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。

    基于Java语言的HBase数据库设计源码分析

    该系统是一款基于Java语言的HBase数据库设计源码,共计26个文件,其中包含18个Java源文件、5个XML配置文件、2个Git忽略文件和1个属性文件。

    HBaseTest_hbase_源码

    通过分析"HBaseTest"源码,我们可以更直观地理解HBase的工作原理和使用方式,为实际项目中的数据存储和处理提供有力的支持。在实际应用中,还需要根据具体需求进行性能调优,以充分利用HBase的优势,解决大数据场景...

    hbase-code-analysis:nosql数据库hbase的源码分析

    本文将对HBase的源码进行深度分析,帮助读者理解其内部工作原理和设计思路。 首先,我们来看看HBase的核心架构。HBase基于Google的Bigtable模型,采用分布式存储方式,由RegionServer负责数据存储,Master服务器...

Global site tag (gtag.js) - Google Analytics