HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)



private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
    if (row == null) throw new IllegalArgumentException("row cannot be null");
    HRegionLocation loc = null;
    IOException locationException = null;
    try {
      loc = hConnection.locateRegion(this.tableName, row.getRow());
      if (loc == null) {
        locationException = new IOException("No location found, aborting submit for" +
            " tableName=" + tableName +
            " rowkey=" + Arrays.toString(row.getRow()));
    } catch (IOException e) {
      locationException = e;
    if (locationException != null) {
      // There are multiple retries in locateRegion already. No need to add new.
      // We can't continue with this row, hence it's the last retry.
      manageError(numAttempt, posInList, row, false, locationException, null);
      return null;

    return loc;

private HRegionLocation locateRegion(final TableName tableName,
      final byte [] row, boolean useCache, boolean retry)
    throws IOException {
      if (this.closed) throw new IOException(toString() + " closed");
      if (tableName== null || tableName.getName().length == 0) {
        throw new IllegalArgumentException(
            "table name cannot be null or zero length");

      if (tableName.equals(TableName.META_TABLE_NAME)) {
        return this.registry.getMetaRegionLocation();
      } else {
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
          useCache, userRegionLock, retry);

先看看变化的部分:调用了Registry的一个方法,具体由ZooKeeperRegistry实现,我很高兴的告诉亲,both of 他们都是新增的,94版本中没有。

public HRegionLocation getMetaRegionLocation() throws IOException {
    ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();

    try {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
      ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout);
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looked up meta region location, connection=" + this +
          "; serverName=" + ((servername == null) ? "null" : servername));
      if (servername == null) return null;
      return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
    } catch (InterruptedException e) {
      return null;
    } finally {

  public static ServerName blockUntilAvailable(final ZooKeeperWatcher zkw,
      final long timeout)
  throws InterruptedException {
    byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.metaServerZNode, timeout);
    if (data == null) return null;
    try {
      return ServerName.parseFrom(data);
    } catch (DeserializationException e) {
      LOG.warn("Failed parse", e);
      return null;


private HRegionLocation locateRegionInMeta(final TableName parentTable,
      final TableName tableName, final byte [] row, boolean useCache,
      Object regionLockObject, boolean retry)
    throws IOException {
      HRegionLocation location;
      // If we are supposed to be using the cache, look in the cache to see if
      // we already have the region.
      // 上来还是首先从缓存里面读取,方法接口没有变。备注【1】
      if (useCache) {
        location = getCachedLocation(tableName, row);
        if (location != null) {
          return location;
      int localNumRetries = retry ? numTries : 1;
      // build the key of the meta region we should be looking for.
      // the extra 9's on the end are necessary to allow "exact" matches
      // without knowing the precise region names.
      // 还是定义region信息查询键
      byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
        HConstants.NINES, false);
      for (int tries = 0; true; tries++) {
        if (tries >= localNumRetries) {
          throw new NoServerForRegionException("Unable to find region for "
            + Bytes.toStringBinary(row) + " after " + numTries + " tries.");

        HRegionLocation metaLocation = null;
        try {
          // locate the meta region
          // 还是先查询父表。不过如上所示,父表查询接口有了很大的变化,已经没有root表的查询了,直接从ZK获取meta表信息
          metaLocation = locateRegion(parentTable, metaKey, true, false);
          // If null still, go around again.
          if (metaLocation == null) continue;
          // RPC的调用方有了些变化,原来是使用HRgionInterface实例进行查询,实际上由委托代理,调用HBaseclient方法实现。这里先记下来备注【2】
          ClientService.BlockingInterface service = getClient(metaLocation.getServerName());

          Result regionInfoRow;
          // This block guards against two threads trying to load the meta
          // region at the same time. The first will load the meta region and
          // the second will use the value that the first one found.
          synchronized (regionLockObject) {
            // Check the cache again for a hit in case some other thread made the
            // same query while we were waiting on the lock.
            // 和之前一样,类似于读写双锁检查,进入写模块之后,需要再次确认缓存,避免重复请求
            if (useCache) {
              location = getCachedLocation(tableName, row);
              if (location != null) {
                return location;
              // If the parent table is META, we may want to pre-fetch some
              // region info into the global region cache for this table.
              if (parentTable.equals(TableName.META_TABLE_NAME)
                  && (getRegionCachePrefetch(tableName))) {
                prefetchRegionCache(tableName, row);//和之前一样,父表为meta表则进行预抓取。这里新增了一个TableName的类型。直接使用该类型的比较方法,去掉了之前的直接byte比较。备注【3】
              location = getCachedLocation(tableName, row);//还是一样,第三次确认
              if (location != null) {
                return location;
            } else {
              // If we are not supposed to be using the cache, delete any existing cached location
              // so it won't interfere.
              forceDeleteCachedLocation(tableName, row);//这里只是方法名变了,不过内部实现,内部的内部实现跟之前完全是一模一样的。
            // Query the meta region for the location of the meta region
            // 查询Region meta,有变化。
            regionInfoRow = ProtobufUtil.getRowOrBefore(service,
              metaLocation.getRegionInfo().getRegionName(), metaKey,
          if (regionInfoRow == null) {
            throw new TableNotFoundException(tableName);

          // convert the row result into the HRegionLocation we need!
          // 将查询结果转化为一个HRegionInfo,这里的调用有些改变,之前是先获取value,然后委托Writables.getWritable(  
                    value, new HRegionInfo()),这里语义更明显。备注【5】,这里委托了protobuf,只能先放放了
          HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
          // 跟之前的判断一样,检查regionInfo是否为空,是否属于当前表,是否正在split或者是否已经下线。多了一个down机检查。
          if (regionInfo == null) {
            throw new IOException("HRegionInfo was null or empty in " +
              parentTable + ", row=" + regionInfoRow);

          // possible we got a region of a different table...
          if (!regionInfo.getTable().equals(tableName)) {
            throw new TableNotFoundException(
                  "Table '" + tableName + "' was not found, got: " +
                  regionInfo.getTable() + ".");
          if (regionInfo.isSplit()) {
            throw new RegionOfflineException("the only available region for" +
              " the required row is a split parent," +
              " the daughters should be online soon: " +
          if (regionInfo.isOffline()) {
            throw new RegionOfflineException("the region is offline, could" +
              " be caused by a disable table call: " +
          ServerName serverName = HRegionInfo.getServerName(regionInfoRow);//这里将原有的byte[]信息转化为对象,代码可读性更好了,94中这里很多byte[]解析。
          if (serverName == null) {
            throw new NoServerForRegionException("No server address listed " +
              "in " + parentTable + " for region " +
              regionInfo.getRegionNameAsString() + " containing row " +

          if (isDeadServer(serverName)){
            throw new RegionServerStoppedException("hbase:meta says the region "+
                regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
                ", but it is dead.");

          // Instantiate the location
          // HRegionLocation的构造稍微变复杂了,其实也就是封装了下,唯一多出来的就是一个seqNum,不知道具体作用还  
          location = new HRegionLocation(regionInfo, serverName,
          cacheLocation(tableName, null, location);//缓存,备注【6】
          return location;
        } catch (TableNotFoundException e) {
          // if we got this error, probably means the table just plain doesn't
          // exist. rethrow the error immediately. this should always be coming
          // from the HTable constructor.
          throw e;
        } catch (IOException e) {
          if (e instanceof RemoteException) {
            e = ((RemoteException)e).unwrapRemoteException();
          if (tries < numTries - 1) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("locateRegionInMeta parentTable=" +
                parentTable + ", metaLocation=" +
                ((metaLocation == null)? "null": "{" + metaLocation + "}") +
                ", attempt=" + tries + " of " +
                this.numTries + " failed; retrying after sleep of " +
                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
          } else {
            throw e;
          // Only relocate the parent region if necessary
          if(!(e instanceof RegionOfflineException ||
              e instanceof NoServerForRegionException)) {
            relocateRegion(parentTable, metaKey);
          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
        } catch (InterruptedException e) {
          throw new IOException("Giving up trying to location region in " +
            "meta: thread is interrupted.");






private void cacheLocation(final TableName tableName, final HRegionLocation source,
        final HRegionLocation location) {
      boolean isFromMeta = (source == null);
      byte [] startKey = location.getRegionInfo().getStartKey();
      Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);//(1)
      boolean isNewCacheEntry = false;
      boolean isStaleUpdate = false;
      HRegionLocation oldLocation = null;
      synchronized (this.cachedRegionLocations) {
        oldLocation = tableLocations.get(startKey);// 还是用startKey做索引
        isNewCacheEntry = (oldLocation == null);//如果能查询出来,则表示更新
        // If the server in cache sends us a redirect, assume it's always valid.
        if (!isNewCacheEntry && !oldLocation.equals(source)) {
          long newLocationSeqNum = location.getSeqNum();

          // 这里解释了两个判断是否为陈旧信息的判断标准,有可能服务器自己关闭了老的region,当我们请求的时候,告诉我们一个新的region信息,类似于我们用一个url信息请求一个网页,但是收到一个302
          // 或者服务器关闭了一个region,但是用相同的序列号又打开了一个新的。之前HRegionlocation的构造函数多了一个seqNum在这里就用上了,看来96版的region信息管理跟之前的版本相比较有较大的变化 
          // Meta record is stale - some (probably the same) server has closed the region
          // with later seqNum and told us about the new location.
          boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
          // Same as above for redirect. However, in this case, if the number is equal to previous
          // record, the most common case is that first the region was closed with seqNum, and then
          // opened with the same seqNum; hence we will ignore the redirect.
          // There are so many corner cases with various combinations of opens and closes that
          // an additional counter on top of seqNum would be necessary to handle them all.
          boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
          isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
        if (!isStaleUpdate) {
          tableLocations.put(startKey, location);
      if (isNewCacheEntry) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("Cached location for " +
            location.getRegionInfo().getRegionNameAsString() +
            " is " + location.getHostnamePort());
      } else if (isStaleUpdate && !location.equals(oldLocation)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("Ignoring stale location update for "
            + location.getRegionInfo().getRegionNameAsString() + ": "
            + location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
            + oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());





public ClientService.BlockingInterface getClient(final ServerName sn)
    throws IOException {
      if (isDeadServer(sn)) {
        throw new RegionServerStoppedException(sn + " is dead.");
      String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
      // 这个key简单,这俩玩意中间加个@连起来
      this.connectionLock.putIfAbsent(key, key);
      ClientService.BlockingInterface stub = null;
      synchronized (this.connectionLock.get(key)) {
        stub = (ClientService.BlockingInterface)this.stubs.get(key);
        if (stub == null) {
          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
            user, this.rpcTimeout);
          stub = ClientService.newBlockingStub(channel);
          // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
          // Just fail on first actual call rather than in here on setup.
          this.stubs.put(key, stub);
      return stub;


private void prefetchRegionCache(final TableName tableName,
        final byte[] row) {
      // Implement a new visitor for MetaScanner, and use it to walk through
      // the hbase:meta
      // 还是用MetaScannerVisitorBase来遍历meta信息,跟之前一样,用MetaScanner来简化了之前基于Writable的HRegionInfo解析
      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
        public boolean processRow(Result result) throws IOException {
          try {
            HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
            if (regionInfo == null) {
              return true;

            // possible we got a region of a different table...
            if (!regionInfo.getTable().equals(tableName)) {
              return false; // stop scanning
            if (regionInfo.isOffline()) {
              // don't cache offline regions
              return true;

            ServerName serverName = HRegionInfo.getServerName(result);
            if (serverName == null) {
              return true; // don't cache it
            // instantiate the location
            long seqNum = HRegionInfo.getSeqNumDuringOpen(result);//知道HRegionlocation的seqNum哪儿来的了。MS META表加料了哦。info:seqnumDuringOpen,没有则为-1
            HRegionLocation loc = new HRegionLocation(regionInfo, serverName, seqNum);
            // cache this meta entry
            cacheLocation(tableName, null, loc);
            return true;
          } catch (RuntimeException e) {
            throw new IOException(e);
      try {
        // pre-fetch certain number of regions info at region cache.
        MetaScanner.metaScan(conf, this, visitor, tableName, row,
            this.prefetchRegionLimit, TableName.META_TABLE_NAME);
      } catch (IOException e) {
        LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);

    public static ServerName getServerName(final Result r) {
    byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
    if (value == null || value.length == 0) return null;
    String hostAndPort = Bytes.toString(value);
    value = r.getValue(HConstants.CATALOG_FAMILY,
    if (value == null || value.length == 0) return null;
    return new ServerName(hostAndPort, Bytes.toLong(value));





