`

hbase hlog源码

阅读更多

  HLog线程的启动入口:

  

  HRegionServer启动线程

    private void startServiceThreads() throws IOException {.....
    Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
        uncaughtExceptionHandler);//logRoller守护进程,每一个小时生成一个hlog
    this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
    splitLogWorker.start();//启动split work类
  .....}
  HRegionServer实例化FSHlog
    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
  throws IOException {.....
  this.hlog = setupWALAndReplication();//RS的hlog
  .....}

 

  FSHlog启动hlog的各个线程 

 asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
    asyncWriter.start();//从队列写logedit到stream的进程
   
    int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
    asyncSyncers = new AsyncSyncer[syncerNums];
    for (int i = 0; i < asyncSyncers.length; ++i) {
      asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
      asyncSyncers[i].start();//将stream里的logedit写到hdfs上,并且同步image
    }

    asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
    asyncNotifier.start();//专门的notify同步线程,(无需同步线程去notify其他的)??

    coprocessorHost = new WALCoprocessorHost(this, conf);

    this.metrics = new MetricsWAL();

 

 HLog的wal过程:

  

  HBase put源码分析中 http://blackproof.iteye.com/blog/2197710

  在第7步中,将hlog写入logedit队列中

  FSHLog的AsyncWriter线程将队列写入hdfs中

  1.等待新write进入队列

  2.等待队列同步,置换,清空

  3.logedit写入hdfs(outputstream中)

  4.更新最新的write num

  public void run() {
      try {
        while (!this.isInterrupted()) {
          // 1. wait until there is new writes in local buffer
          synchronized (this.writeLock) {//写同步
            while (this.pendingTxid <= this.lastWrittenTxid) {//等待新writenum,队列中的writenum大于当前writenum
              this.writeLock.wait();//否则等待新write edit
            }
          }

          // 2. get all buffered writes and update 'real' pendingTxid
          //    since maybe newer writes enter buffer as AsyncWriter wakes
          //    up and holds the lock
          // NOTE! can't hold 'updateLock' here since rollWriter will pend
          // on 'sync()' with 'updateLock', but 'sync()' will wait for
          // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock
          // can leads to pendWrites more than pendingTxid, but not problem
          List<Entry> pendWrites = null;
          synchronized (pendingWritesLock) {//等待队列同步,置换,清空
            this.txidToWrite = unflushedEntries.get();
            pendWrites = pendingWrites;
            pendingWrites = new LinkedList<Entry>();
          }

          // 3. write all buffered writes to HDFS(append, without sync)
          try {
            for (Entry e : pendWrites) {
              writer.append(e);//hlog edit写入hdfs中
            }
          } catch(IOException e) {
            LOG.error("Error while AsyncWriter write, request close of hlog ", e);
            requestLogRoll();

            asyncIOE = e;
            failedTxid.set(this.txidToWrite);
          }

          // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync'
          this.lastWrittenTxid = this.txidToWrite;//更新最新的write num
          boolean hasIdleSyncer = false;
          for (int i = 0; i < asyncSyncers.length; ++i) {
            if (!asyncSyncers[i].isSyncing()) {
              hasIdleSyncer = true;
              asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
              break;
            }
          }
          if (!hasIdleSyncer) {
            int idx = (int)this.lastWrittenTxid % asyncSyncers.length;
            asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
          }
        }
      } catch (InterruptedException e) {
        LOG.debug(getName() + " interrupted while waiting for " +
            "newer writes added to local buffer");
      } catch (Exception e) {
        LOG.error("UNEXPECTED", e);
      } finally {
        LOG.info(getName() + " exiting");
      }
    }
  }
  

 

 

  FSHlog的AsyncSyncer线程,会将outputstream的数据flush到磁盘,并同步hdfs的image

   public void run() {
      try {
        while (!this.isInterrupted()) {
          // 1. wait until AsyncWriter has written data to HDFS and
          //    called setWrittenTxid to wake up us
          synchronized (this.syncLock) {
            while (this.writtenTxid <= this.lastSyncedTxid) {
              this.syncLock.wait();
            }
            this.txidToSync = this.writtenTxid;
          }

          // if this syncer's writes have been synced by other syncer:
          // 1. just set lastSyncedTxid
          // 2. don't do real sync, don't notify AsyncNotifier, don't logroll check
          // regardless of whether the writer is null or not
          if (this.txidToSync <= syncedTillHere.get()) {
            this.lastSyncedTxid = this.txidToSync;
            continue;
          }

          // 2. do 'sync' to HDFS to provide durability
          long now = EnvironmentEdgeManager.currentTimeMillis();
          try {
            if (writer == null) {
              // the only possible case where writer == null is as below:
              // 1. t1: AsyncWriter append writes to hdfs,
              //        envokes AsyncSyncer 1 with writtenTxid==100
              // 2. t2: AsyncWriter append writes to hdfs,
              //        envokes AsyncSyncer 2 with writtenTxid==200
              // 3. t3: rollWriter starts, it grabs the updateLock which
              //        prevents further writes entering pendingWrites and
              //        wait for all items(200) in pendingWrites to append/sync
              //        to hdfs
              // 4. t4: AsyncSyncer 2 finishes, now syncedTillHere==200
              // 5. t5: rollWriter close writer, set writer=null...
              // 6. t6: AsyncSyncer 1 starts to use writer to do sync... before
              //        rollWriter set writer to the newly created Writer
              //
              // Now writer == null and txidToSync > syncedTillHere here:
              // we need fail all the writes with txid <= txidToSync to avoid
              // 'data loss' where user get successful write response but can't
              // read the writes!
              LOG.fatal("should never happen: has unsynced writes but writer is null!");
              asyncIOE = new IOException("has unsynced writes but writer is null!");
              failedTxid.set(this.txidToSync);
            } else {
              this.isSyncing = true;            
              writer.sync();//同步方法//hlog的地方需要增加retry,延长文件租期过期时间,hlog是否重写
              this.isSyncing = false;
            }
            postSync();
          } catch (IOException e) {
            LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
            requestLogRoll();

            asyncIOE = e;
            failedTxid.set(this.txidToSync);

            this.isSyncing = false;
          }
          metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);

          // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put'
          // handler threads on 'sync()'
          this.lastSyncedTxid = this.txidToSync;
          asyncNotifier.setFlushedTxid(this.lastSyncedTxid);

          // 4. check and do logRoll if needed
          boolean logRollNeeded = false;
          if (rollWriterLock.tryLock()) {
            try {
              logRollNeeded = checkLowReplication();
            } finally {
              rollWriterLock.unlock();
            }            
            try {
              if (logRollNeeded || writer != null && writer.getLength() > logrollsize) {
                requestLogRoll();
              }
            } catch (IOException e) {
              LOG.warn("writer.getLength() failed,this failure won't block here");
            }
          }
        }
      } catch (InterruptedException e) {
        LOG.debug(getName() + " interrupted while waiting for " +
            "notification from AsyncWriter thread");
      } catch (Exception e) {
        LOG.error("UNEXPECTED", e);
      } finally {
        LOG.info(getName() + " exiting");
      }
    }
  }

 

 

  LogRoller定时调用FSHlog的rollwrite方法,生成hlog,将stream流中的数据输出到hdfs上

   @Override
  public byte [][] rollWriter(boolean force)
      throws FailedLogCloseException, IOException {
    rollWriterLock.lock();
    try {
      // Return if nothing to flush.
      if (!force && this.writer != null && this.numEntries.get() <= 0) {
        return null;
      }
      byte [][] regionsToFlush = null;
      if (closed) {
        LOG.debug("HLog closed. Skipping rolling of writer");
        return null;
      }
      try {
        if (!closeBarrier.beginOp()) {
          LOG.debug("HLog closing. Skipping rolling of writer");
          return regionsToFlush;
        }
        // Do all the preparation outside of the updateLock to block
        // as less as possible the incoming writes
        long currentFilenum = this.filenum;
        Path oldPath = null;
        if (currentFilenum > 0) {
          //computeFilename  will take care of meta hlog filename
          oldPath = computeFilename(currentFilenum);
        }
        this.filenum = System.currentTimeMillis();
        Path newPath = computeFilename();
        while (fs.exists(newPath)) {
          this.filenum++;
          newPath = computeFilename();
        }

        // Tell our listeners that a new log is about to be created
        if (!this.listeners.isEmpty()) {
          for (WALActionsListener i : this.listeners) {
            i.preLogRoll(oldPath, newPath);
          }
        }
        FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
        // Can we get at the dfsclient outputstream?
        FSDataOutputStream nextHdfsOut = null;
        if (nextWriter instanceof ProtobufLogWriter) {
          nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
          // perform the costly sync before we get the lock to roll writers.
          try {
            nextWriter.sync();//protobuf flush到hdfs上,并且seque write写hdfs的同步,让block和image同步
          } catch (IOException e) {
            // optimization failed, no need to abort here.
            LOG.warn("pre-sync failed", e);
          }
        }

        Path oldFile = null;
        int oldNumEntries = 0;
        synchronized (updateLock) {
          // Clean up current writer.
          oldNumEntries = this.numEntries.get();
          oldFile = cleanupCurrentWriter(currentFilenum);
          this.writer = nextWriter;
          this.hdfs_out = nextHdfsOut;
          this.numEntries.set(0);
          if (oldFile != null) {
            this.hlogSequenceNums.put(oldFile, this.latestSequenceNums);
            this.latestSequenceNums = new HashMap<byte[], Long>();
          }
        }
        if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
        else {
          long oldFileLen = this.fs.getFileStatus(oldFile).getLen();
          this.totalLogSize.addAndGet(oldFileLen);
          LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries="
              + oldNumEntries + ", filesize="
              + StringUtils.humanReadableInt(oldFileLen) + "; new WAL "
              + FSUtils.getPath(newPath));
        }

        // Tell our listeners that a new log was created
        if (!this.listeners.isEmpty()) {
          for (WALActionsListener i : this.listeners) {
            i.postLogRoll(oldPath, newPath);
          }
        }

        // Can we delete any of the old log files?
        if (getNumRolledLogFiles() > 0) {
          cleanOldLogs();
          regionsToFlush = findRegionsToForceFlush();
        }
      } finally {
        closeBarrier.endOp();
      }
      return regionsToFlush;
    } finally {
      rollWriterLock.unlock();
    }
  }

 

1
0
分享到:
评论

相关推荐

    HBase实战源码

    《HBase实战源码》是针对Apache HBase这一分布式、高性能、基于列族的NoSQL数据库的深度解析书籍。源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供...

    hbase-0.98.1源码包

    源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...

    Hbase权威指南 随书源代码 源码包 绝对完整版

    这个源码包名为"hbase-book-master",意味着它是该书的主代码仓库,包含完整的示例和教程,非常适合开发者深入研究。 HBase是一种构建在Hadoop文件系统(HDFS)之上的分布式、版本化、列族式存储系统,设计用于处理...

    hbase源码分析

    ### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...

    Hbase权威指南随书源代码源码包

    通过研究这些源码,读者可以了解到HBase的内部工作原理,如MemStore、HLog、BlockCache等关键组件的作用,以及它们如何协同工作以实现高效的数据存储和检索。此外,还能学习到如何根据业务需求来设计合理的表结构,...

    HBase源码分析

    ### HBase源码分析——关键知识点详解 #### 一、HBase性能测试总结与环境配置 **测试环境:** - **硬件配置:** - 1台客户端机器 - 5台RegionServer服务器 - 1台Master服务器 - 3台Zookeeper服务器 - **软件...

    HBase 应用平台 Replication 功能

    对于源码级别的理解,可以查看HBase的源代码,了解Replication的具体实现细节。同时,HBase提供了命令行工具以及Admin API来管理和监控复制状态。 总的来说,HBase的Replication功能是保证数据可靠性和一致性的...

    hbase-2.1.7-bin.tar.gz

    在这个“hbase-2.1.7-bin.tar.gz”压缩包中,包含的是HBase 2.1.7版本的源码及其相关文件,适用于大数据处理环境,经过验证,确保其可用性。本文将详细介绍HBase的基本概念、架构、主要特性以及如何在实际应用中部署...

    Hbase分布式数据库 v2.4.16.zip

    - **写操作**:数据写入时先写入内存,达到一定阈值后写入 HLog(持久化日志),最后批量写入 HDFS,保证数据一致性。 ### 5. 数据索引 HBase 的索引主要依赖于行键的排序特性,可以通过自定义的布隆过滤器或者二...

    smoketest-hbase:HBase 的一些冒烟测试

    在压缩包“smoketest-hbase-master”中,我们可以推测这是包含HBase冒烟测试源码的主分支。"master"通常指的是开发中的主要分支,意味着这些测试是最新的,反映了当前HBase的主要功能和行为。 HBase的冒烟测试可能...

Global site tag (gtag.js) - Google Analytics