`
半点玻璃心
  • 浏览: 27297 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)

阅读更多
又回来了,还是看put,不过版本号变了,希望看0.94的童靴移驾到http://dennis-lee-gammy.iteye.com/admin/blogs/1972269
put和doput方法变化不大,唯一就是原来的缓存队列名字里面加了一个async,然后类型由ArrayList变成了LinkedList。

flushCommit方法
public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    // We're looping, as if one region is overloaded we keep its operations in the buffer.
    // As we can have an operation in progress even if the buffer is empty, we call
    //  backgroundFlushCommits at least one time.
    do {
      backgroundFlushCommits(true);
    } while (!writeAsyncBuffer.isEmpty());
  }

private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    if (ap.hasError()){
      backgroundFlushCommits(true);
    }

    validatePut(put);

    currentWriteBufferSize += put.heapSize();
    writeAsyncBuffer.add(put);

    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }

变化真大啊,原来42行代码一下只有这么点了,以前核心功能由
connection.processBatch(writeBuffer, tableName, pool, results);
完成,这里变成了循环。以前还会检查并保存执行失败的操作返回到缓存列表中,这里第一眼是看不到这些了。看看backgroundFlushCommits 卖的是神马药。

private void backgroundFlushCommits(boolean synchronous) throws
      InterruptedIOException, RetriesExhaustedWithDetailsException {

    try {
      // If there is an error on the operations in progress, we don't add new operations.
      if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
         ap.submit(writeAsyncBuffer, true);//如果任务队列没有清空,并且异步执行器没有问题,则执行提交操作
      }

      if (synchronous || ap.hasError()) {
        if (ap.hasError() && LOG.isDebugEnabled()) {
          LOG.debug(tableName + ": One or more of the operations have failed -" +
              " waiting for all operation in progress to finish (successfully or not)");
        }
        ap.waitUntilDone();//如果是同步模式,或者出现了错误,则都变成同步模式,需要等待完成
      }

      if (ap.hasError()) {
        if (!clearBufferOnFail) {
          // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
          // write buffer. This is a questionable feature kept here for backward compatibility
          // 如果不是失败则清除模式,则保存失败的操作,功能与0.94版本是一致的,不过原来的版本在提交任务的时候
          // 会一并上传一个结果集合,顺序与任务提交的顺序一一对应。顺序取回结果查看是否成功,
          // 并将成功的操作从缓存队列中移除。
          // 而现在的代码,表面上看应该是在某个地方已经清空了,然后ap负责记录并返回失败的操作
          writeAsyncBuffer.addAll(ap.getFailedOperations());
        }
        // 目测ap已经完成了重试,并记录了应有的异常
        RetriesExhaustedWithDetailsException e = ap.getErrors();
        ap.clearErrors();
        throw e;
      }
    } finally {
      currentWriteBufferSize = 0;
      for (Row mut : writeAsyncBuffer) {
        if (mut instanceof Mutation) {
          currentWriteBufferSize += ((Mutation) mut).heapSize();//既然缓存队列之前已经被清除过,也就不用判断是否是失败清除模式了,简单的计算下缓存大小吧。
        }
      }
    }
  }


正常流程几乎完全找不到以前的影子!这里多出来一个处理类org.apache.hadoop.hbase.client.AsyncProcess,即ap成员。这个类是0.94版的代码里面完全没有的。难怪变化那么大。

首先这里有一个参数,指定为同步执行还是异步执行。从上面的doput方法和flushcommit方法可以看出,如果在doput的过程中,也就是调用htable.put(Put)的时候,如果缓存大小超过了客户端写缓存大小的限制,调用这个方法是异步的;而在flushcommit方法中,这个方法是同步的。这里也暴露出来一个与原有流程不同的地方,0.94中doput如果超过大小限制,是委托flushcommit方法提交的,而这里采用了一种更加柔和的方式。另外,那个htable的线程池成员在方法中也找不到它的影子了,以前可是带着到处跑的。


主流程差不多就完成了。重要的两个流程:请求和处理响应,应该是在
ap.submit(writeAsyncBuffer, true)
ap.waitUntilDone();
中实现。继续吧

public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
    if (rows.isEmpty()) {
      return;
    }

    // This looks like we are keying by region but HRegionLocation has a comparator that compares
    // on the server portion only (hostname + port) so this Map collects regions by server.
    // 熟悉的面孔,这不是94中HConnectionImplementation.processBatchCallback(list, tableName, pool, results, null)
    // step 1 第一行么,原来跑这里来了,HRegionLocation --> MultiAction<Row> 的字典结构。
    Map<HRegionLocation, MultiAction<Row>> actionsByServer =
      new HashMap<HRegionLocation, MultiAction<Row>>();
    List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());

    do {
      // Wait until there is at least one slot for a new task.
      // 等待空闲资源执行操作,maxTotalConcurrentTasks =hbase.client.max.total.tasks
      // 默认100,配置文件里面没有哦,亲 TODO【1】
      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);

      // Remember the previous decisions about regions or region servers we put in the
      //  final multi.
      Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
      Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        HRegionLocation loc = findDestLocation(r, 1, posInList);//定位region TODO【2】

        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {//判断region TODO【3】
          // loc is null if there is an error such as meta not available.
          Action<Row> action = new Action<Row>(r, ++posInList);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer);//添加操作 ,跟之前的step 1里面的步骤一致,multiAction按HRegionLocation聚类
          it.remove();//果然,缓存队列在这里会被逐步清空
        }
      }

    } while (retainedActions.isEmpty() && atLeastOne && !hasError());

    HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
    //创建跟踪异常,如果需要创建(hbase.client.retries.by.server指定,配置文件没有,默认为true),则返回一个
    //HConnectionManager.ServerErrorTracker 
    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);//发送请求 TODO【4】
  }


那什么情况下表示有空闲资源呢,看看【1】处的相关代码

   private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
    long lastLog = EnvironmentEdgeManager.currentTimeMillis();
    long currentTasksDone = this.tasksDone.get();

    while ((tasksSent.get() - currentTasksDone) > max) {//如果已发送的任务跟已经完成的任务数差值过大
      long now = EnvironmentEdgeManager.currentTimeMillis();
      if (now > lastLog + 10000) {
        lastLog = now;
        LOG.info(": Waiting for the global number of running tasks to be equals or less than "
            + max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
            ", currentTasksDone=" + currentTasksDone + ", tableName=" + tableName);
      }
      waitForNextTaskDone(currentTasksDone);//等待下一个任务完成
      currentTasksDone = this.tasksDone.get();//看看完成了多少个
    }
  }
 //这个简单,如果已完成任务数没有变化就等100ms
  protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException {
    while (currentNumberOfTask == tasksDone.get()) {
      try {
        synchronized (this.tasksDone) {
          this.tasksDone.wait(100);
        }
      } catch (InterruptedException e) {
        throw new InterruptedIOException("Interrupted." +
            " currentNumberOfTask=" + currentNumberOfTask +
            ",  tableName=" + tableName + ", tasksDone=" + tasksDone.get());
      }
    }
  }


protected boolean canTakeOperation(HRegionLocation loc,
                                     Map<String, Boolean> regionsIncluded,
                                     Map<ServerName, Boolean> serversIncluded) {
    String encodedRegionName = loc.getRegionInfo().getEncodedName();
    Boolean regionPrevious = regionsIncluded.get(encodedRegionName);
    //之前已经有这个region信息,则直接返回以保存的结果,这里有个问题,如果region信息有更新呢?估计在后面的代码里面。
    if (regionPrevious != null) {
      // We already know what to do with this region.
      return regionPrevious;
    }
    //没有的话看看RS的信息,如果RS已经挂了,那么他对应的所有region都挂,不用看了,记录一下告诉上层吧
    Boolean serverPrevious = serversIncluded.get(loc.getServerName());
    if (Boolean.FALSE.equals(serverPrevious)) {
      // It's a new region, on a region server that we have already excluded.
      regionsIncluded.put(encodedRegionName, Boolean.FALSE);
      return false;
    }

    AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName);
    if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
      // Too many tasks on this region already.hbase.client.max.perregion.tasks设置,默认为1哦,配置文件没有哦亲,每次只能运行一个任务?这个设置MS有点坑,后续看看改大了会不会有影响
      regionsIncluded.put(encodedRegionName, Boolean.FALSE);
      return false;
    }
    
    if (serverPrevious == null) {
      // The region is ok, but we need to decide for this region server.
      int newServers = 0; // number of servers we're going to contact so far
      for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
        if (kv.getValue()) {
          newServers++;
        }
      }

      // Do we have too many total tasks already? 如果server的数量与等待完成的任务之和小于最大任务数(之前说过,默认100)
      boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks;

      if (ok) {
        //在检查是否每个server能承受的最大任务数hbase.client.max.perserver.tasks=5,怎么都那么小呢,还不能在配置文件里面找到,坑死了啊
        // If the total is fine, is it ok for this individual server?
        AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
        ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
      }
      // 如果检查失败,RS和Region都设置为false
      if (!ok) {
        regionsIncluded.put(encodedRegionName, Boolean.FALSE);
        serversIncluded.put(loc.getServerName(), Boolean.FALSE);
        return false;
      }

      serversIncluded.put(loc.getServerName(), Boolean.TRUE);
    } else {
      assert serverPrevious.equals(Boolean.TRUE);
    }

    regionsIncluded.put(encodedRegionName, Boolean.TRUE);

    return true;
  }


备注【3】,定位Resion,大操作,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1973255

备注【5】,发送请求,大头,这里先放放
分享到:
评论

相关推荐

    Hadoop与HBase自学笔记

    ### Hadoop与HBase自学笔记知识点总结 #### 一、Hadoop与HBase简介 - **Hadoop**:是一款能够对大量数据进行分布式处理的软件框架。它通过提供高可靠性和高扩展性的分布式计算能力,使得用户能够在廉价的硬件设备...

    hbase-1.2.6-bin+src.tar.rar

    客户端与HBase交互的API在`org.apache.hadoop.hbase.client`包中,如`Table`接口提供了增删查改操作。`Put`、`Get`、`Scan`、`Delete`类分别对应写入、读取、扫描和删除操作。 3.3 Region分裂 Region的分裂过程在`...

    03.hadoop上课笔记之java编程和hbase

    Hadoop 上课笔记之 Java 编程和 HBase Hadoop 是一个分布式计算框架,HBase 是基于 Hadoop 的一个分布式数据库系统。下面是 Java 编程和 HBase 相关的知识点: 一、HBase 主要类和接口 1. Admin 类:用于建立...

    hbase学习笔记

    HBase是一个基于谷歌Bigtable理念设计的开源分布式数据库,它构建在Hadoop的HDFS之上,并依赖Zookeeper进行协调服务。HBase的设计目标是为了处理大规模的数据存储和快速随机访问。 1. **HBase表结构**: HBase的表...

    Hbase课程资料笔记,介绍、原理、入门实操.zip

    HBase,全称为Hadoop Database,是一个基于Google Bigtable设计思想的开源分布式数据库,主要在Apache Hadoop生态系统中运行。HBase提供了一个高可靠性、高性能、可伸缩的列式存储系统,适合处理大规模数据。它支持...

    Sqoop数据采集工具简介、安装、使用学习笔记(配合Hive和Hbase)

    对于 Sqoop2,可以通过先将数据导入 HDFS,然后再使用 Load 或 Put 命令将数据加载到 Hive 或 HBase 中来解决。 - **Hive/HBase -&gt; RDBMS**:两者均不支持直接转换,但可以通过以下步骤实现:先将数据从 Hive 或 ...

    hbase学习笔记.doc

    HBase提供了多种操作命令,如`create`用于创建表,`put`用于插入数据,`scan`用于扫描表,`get`用于获取特定行的数据,`disable`和`drop`分别用于禁用和删除表,`list`用于列出所有表,`exit`用于退出shell。...

    源码笔记资料(1).zip

    结合提供的代码,可以实践HBase的API使用,例如创建表、插入数据、查询数据、删除数据等操作,进一步理解HBase的工作流程。同时,通过分析笔记中的案例,可以了解HBase在实际项目中的应用,如日志分析、用户行为追踪...

    HBaseClientDemo

    通过阅读“HBase java api学习笔记.docx”,你将进一步了解这些概念和API的使用细节,结合“HBaseClientDemo”进行实践,将有助于加深对HBase客户端操作的理解。在大数据处理和分析项目中,熟练掌握HBase Java API...

    大数据学习笔记

    大数据学习笔记 本资源摘要信息涵盖了大数据领域中的多个方面,包括Hadoop、HBase、Sqoop、Spark和Hive等...大数据学习笔记涵盖了Hadoop、HBase、Sqoop、Spark和Hive等技术栈,提供了对大数据领域的深入了解和掌握。

    hbase简介共8页.pdf.zip

    3. **读写流程**:HBase的Get和Put操作,以及HBase如何利用内存和磁盘进行高效的读写。 4. **数据分布和分区**:Region的概念,如何根据行键范围分配数据到不同的Region Server,以及Region的分裂和合并。 5. **...

Global site tag (gtag.js) - Google Analytics