`
半点玻璃心
  • 浏览: 27315 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBASE 代码阅读笔记-1 - PUT-3-提交任务1 (0.96-HADOOP2)

阅读更多
看看提交任务的代码吧。对应http://dennis-lee-gammy.iteye.com/admin/blogs/1973249 TODO 【4】

public void sendMultiAction(final List<Action<Row>> initialActions,
                              Map<HRegionLocation, MultiAction<Row>> actionsByServer,
                              final int numAttempt,
                              final HConnectionManager.ServerErrorTracker errorsByServer) {
    // Send the queries and add them to the inProgress list
    // This iteration is by server (the HRegionLocation comparator is by server portion only).
    for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
      final HRegionLocation loc = e.getKey();
      final MultiAction<Row> multiAction = e.getValue();
      incTaskCounters(multiAction.getRegions(), loc.getServerName());
      //这里简单,就是将taskSent,taskPerRegion(默认最大值1),taskPerServer(默认最大值5)都进行了+1操作,以便之前的判断
      Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
        @Override
        public void run() {
          MultiResponse res;
          try {
            MultiServerCallable<Row> callable = createCallable(loc, multiAction);
            //原先的Callable没有了哦亲。MultiServerCallable,全新的Api哦亲
            try {
              res = createCaller(callable).callWithoutRetries(callable);
              //这里是创建了一个RpcRetryingCaller,然后再调用callable,这里很奇怪,在构造的时候传入了callable,但是却没有使用,RpcRetryingCaller有一个估计MultiServerCallable的成员变量,后来因为某些原因干掉了。createCall方法很简单,主要是callWithoutRetries。备注【1】
            } catch (IOException e) {
              LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
                ", resubmitting all since not sure where we are at", e);
              resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);//失败重试,备注【2】
              return;
            }

            receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);//提交任务,备注【3】
          } finally {
            decTaskCounters(multiAction.getRegions(), loc.getServerName());
            // taskDone+1,taskPerRegion,taskPerServer-1
          }
        }
      });

      try {
        this.pool.submit(runnable);//HTable的pool跑到这里来了
      } catch (RejectedExecutionException ree) {
        // This should never happen. But as the pool is provided by the end user, let's secure
        //  this a little.
        decTaskCounters(multiAction.getRegions(), loc.getServerName());
        LOG.warn("The task was rejected by the pool. This is unexpected." +
            " Server is " + loc.getServerName(), ree);
        // We're likely to fail again, but this will increment the attempt counter, so it will
        //  finish.
        resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);//同上,失败重试
      }
    }
  }


先看看失败重试吧,对应备注【2】

private void resubmitAll(List<Action<Row>> initialActions, MultiAction<Row> rsActions,
                           HRegionLocation location, int numAttempt, Throwable t,
                           HConnectionManager.ServerErrorTracker errorsByServer) {
    // Do not use the exception for updating cache because it might be coming from
    // any of the regions in the MultiAction.
    // 先更新cache的region信息
    hConnection.updateCachedLocations(tableName,
      rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
    errorsByServer.reportServerError(location);//保存异常
    List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
    for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
      for (Action<Row> action : e.getValue()) {
        if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
            true, t, location)) {//检查是否可以重试。这里备注下【4】
          toReplay.add(action);
        }
      }
    }

    if (toReplay.isEmpty()) {//没有可重试的就88
      LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
        initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
    } else {
      submit(initialActions, toReplay, numAttempt, errorsByServer);//重试
    }
  }


public void updateCachedLocations(final TableName tableName, byte[] rowkey,
      final Object exception, final HRegionLocation source) {
      if (rowkey == null || tableName == null) {
        LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
            ", tableName=" + (tableName == null ? "null" : tableName));
        return;
      }

      // Is it something we have already updated?
      final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
      if (oldLocation == null) {
        // There is no such location in the cache => it's been removed already => nothing to do
        return;
      }

      HRegionInfo regionInfo = oldLocation.getRegionInfo();
      final RegionMovedException rme = RegionMovedException.find(exception);
      if (rme != null) {
        if (LOG.isTraceEnabled()){
          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
            rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort());
        }
        //这里如果是region已经移动的信息,则异常会将新的region地址返回,然后更新缓存
        updateCachedLocation(
            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
      } else if (RegionOpeningException.find(exception) != null) {
        //意思大概是region如果已经由别的RS打开,则client可以直接请求新的region,所以这里不用做处理
        //不是很明白,求高手解答
        if (LOG.isTraceEnabled()) {
          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " is being opened on "
              + source.getHostnamePort() + "; not deleting the cache entry");
        }
      } else {
        deleteCachedLocation(regionInfo, source);//真是不可恢复的错误就把缓存删了吧
      }
    }


  void reportServerError(HRegionLocation server) {
      ServerErrors errors = errorsByServer.get(server);
      if (errors != null) {
        errors.addError();
      } else {
        errorsByServer.put(server, new ServerErrors());
      }
    }


//老三样,按region归并action,然后提交。少了很多判断
private void submit(List<Action<Row>> initialActions,
                      List<Action<Row>> currentActions, int numAttempt,
                      final HConnectionManager.ServerErrorTracker errorsByServer) {
    // group per location => regions server
    final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
        new HashMap<HRegionLocation, MultiAction<Row>>();

    for (Action<Row> action : currentActions) {
      HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());
      if (loc != null) {
        addAction(loc, action, actionsByServer);
      }
    }

    if (!actionsByServer.isEmpty()) {
      sendMultiAction(initialActions, actionsByServer, numAttempt, errorsByServer);
    }
  }



这样重试的代码就理清楚了
看看RpcRetryingCaller.callWithoutRetries方法吧,主要是调用了prepare和call方法,
public T callWithoutRetries(RetryingCallable<T> callable)
  throws IOException, RuntimeException {
    // The code of this method should be shared with withRetries.
    this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
    try {
      beforeCall();
      callable.prepare(false);//这里是个坑哦亲,不管传入什么都一样哦,没有的参数啊亲,主要是返回一个stub,已经在之前的region定位篇中说过,这依赖protobuf库。
      return callable.call();
    } catch (Throwable t) {
      Throwable t2 = translateException(t);
      // It would be nice to clear the location cache here.
      if (t2 instanceof IOException) {
        throw (IOException)t2;
      } else {
        throw new RuntimeException(t2);
      }
    } finally {
      afterCall();
    }
  }


  public void prepare(boolean reload) throws IOException {
    // Use the location we were given in the constructor rather than go look it up.
    setStub(getConnection().getClient(getLocation().getServerName()));
  }


call方法是核心,明天再看吧
分享到:
评论

相关推荐

    hbase-hadoop2-compat-1.2.12-API文档-中文版.zip

    赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop2-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....

    hbase-hadoop-compat-1.1.3-API文档-中英对照版.zip

    赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....

    hbase-hadoop2-compat-1.1.3-API文档-中英对照版.zip

    赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop2-compat-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop2-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop2-compat-1.2.12-API文档-中英对照版.zip

    赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop-compat-1.2.12-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-...

    hbase-hadoop1-compat-0.98.3-hadoop1.zip

    【标题】"hbase-hadoop1-compat-0.98.3-hadoop1.zip" 指的是HBase的一个特定版本,它包含了与Hadoop 1.x版本兼容的组件。HBase是Apache软件基金会的一个开源分布式数据库,设计用于处理大规模数据集。这个版本...

    hbase-hadoop-compat-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.4.3....

    hbase-hadoop-compat-1.2.12-API文档-中英对照版.zip

    赠送jar包:hbase-hadoop-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-...

    hbase-0.98.7-hadoop2-bin.tar

    《深入理解HBase:以hbase-0.98.7-hadoop2-bin.tar为例》 HBase,作为Apache软件基金会的重要项目之一,是构建在Hadoop生态系统之上的一款分布式、高性能、列式存储的NoSQL数据库。它为大规模数据集提供了实时读写...

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase2.x-hbck2 jar包及测试命令

    HBCK2 jar包是这个工具的可执行文件,通常在HBase的lib目录下可以找到,名为`hbase-hbck2-x.x.x.jar`,其中`x.x.x`表示具体的HBase版本号。这个jar包包含了所有执行HBCK2命令所需的功能和类。你可以通过Hadoop的`...

    hbase-hbck2-1.2.0-SNAPSHOT.jar

    HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除; HBCK2已经被剥离出HBase成为了一个单独的项目,如果你想要使用这个工具,需要根据自己HBase的版本,编译源码。其GitHub地址...

    hbase_0.98.13-hadoop2-bin.tar.gz

    1. 下载与解压:首先,你需要下载HBase的0.98.13-hadoop2版本,这个版本已经包含在名为“hbase-0.98.13-hadoop2”的压缩包文件中。解压到指定目录,例如 `/usr/local/hbase`。 2. 配置环境变量:编辑 `~/.bashrc` 或...

    hbase-hadoop1-compat-0.98.7-hadoop1.zip

    tapestry-security.zip,基于shiro security的tapestry 5的tynamo安全包tapestry security是基于apache shiro的apache tapestry 5的安全模块

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    hbase-0.98.12.1-hadoop1-bin.tar.gz

    本文将围绕"Hbase-0.98.12.1-hadoop1-bin.tar.gz"这一特定版本的HBase进行详细介绍,包括其特性、安装与配置、以及与Hadoop1的集成。 1. **HBase 0.98.12.1概述** HBase 0.98.12.1是HBase的早期稳定版本,提供了...

    CentOS-6.4 64位系统下hadoop-2.2.0+hbase-0.96+zookeeper-3.4.5 分布式安装配置

    在HBase的安装中,同样需要解压缩安装包,然后根据Hadoop的配置来调整HBase的配置文件(如`hbase-site.xml`)。在`hbase-site.xml`中,应指定Zookeeper集群的位置: ```xml &lt;name&gt;hbase.rootdir &lt;value&gt;hdfs:/...

Global site tag (gtag.js) - Google Analytics