`
uestzengting
  • 浏览: 96190 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hbase put过程源代码阅读笔记

阅读更多
客户端
1.HTable.put
    for (Put put : puts) {
      validatePut(put);//验证Put有效,主要是判断kv的长度
      writeBuffer.add(put);//写入缓存
      currentWriteBufferSize += put.heapSize();//计算缓存容量
    }
    if (autoFlush || currentWriteBufferSize > writeBufferSize) {
      flushCommits();//如果自动Flush或者缓存到达阀值,则执行flush
    }
2.HTable.flushCommits
    try {
      connection.processBatchOfPuts(writeBuffer, tableName, pool);//调用HConnection来提交Put,传入了一个线程池,看来是有异步调用的过程
    } finally {
      // the write buffer was adjusted by processBatchOfPuts
      currentWriteBufferSize = 0;
      for (Put aPut : writeBuffer) {
        currentWriteBufferSize += aPut.heapSize();//currentWriteBufferSize又重新计算了一遍,看来一批提交不一定会全部提交完
      }
    }
3.HConnectionManager.HConnectionImplementation.processBatch
第一步:按RegionServer把Put分类,以便于将一批Put按RegionServer批量提交
// step 1: break up into regionserver-sized chunks and build the data structs

        Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();//RegionServer和批量Put的映射关系
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);//定位Put在哪个Region上
            HServerAddress address = loc.getServerAddress();//定位Region在哪个RegionServer上
            byte[] regionName = loc.getRegionInfo().getRegionName();
            MultiAction actions = actionsByServer.get(address);//看该RegionServer上的批量对象创建没有,没有就创建一个
            if (actions == null) {
              actions = new MultiAction();
              actionsByServer.put(address, actions);
            }

            Action action = new Action(regionName, row, i);//根据Put创建一个响应对象,放到批量操作对象里,什么是响应对象呢,就是Put和返回结果的组合
            lastServers[i] = address;
            actions.add(regionName, action);
          }
        }

第二步:往RegionServer上提交请求,各个RegionServer是并发提交的
// step 2: make the requests

        Map<HServerAddress,Future<MultiResponse>> futures =
            new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());

        for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }

第三步,等待各RegionServer返回结果,并准备重试
// step 3: collect the failures and successes and prepare for retry

        for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
          HServerAddress address = responsePerServer.getKey();

          try {
            Future<MultiResponse> future = responsePerServer.getValue();
            MultiResponse resp = future.get();

            if (resp == null) {
              // Entire server failed
              LOG.debug("Failed all for server: " + address + ", removing from cache");
              continue;
            }

            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
              byte[] regionName = e.getKey();
              List<Pair<Integer, Object>> regionResults = e.getValue();
              for (Pair<Integer, Object> regionResult : regionResults) {
                if (regionResult == null) {
                  // if the first/only record is 'null' the entire region failed.
                  LOG.debug("Failures for region: " +
                      Bytes.toStringBinary(regionName) +
                      ", removing from cache");
                } else {
                  // Result might be an Exception, including DNRIOE
                  results[regionResult.getFirst()] = regionResult.getSecond();
                }
              }
            }
          } catch (ExecutionException e) {
            LOG.debug("Failed all from " + address, e);
          }
        }

第四步,识别返回的错误,准备重试
// step 4: identify failures and prep for a retry (if applicable).

        // Find failures (i.e. null Result), and add them to the workingList (in
        // order), so they can be retried.
        retry = false;
        workingList.clear();
        for (int i = 0; i < results.length; i++) {
          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
          // then retry that row. else dont.
          if (results[i] == null ||
              (results[i] instanceof Throwable &&
                  !(results[i] instanceof DoNotRetryIOException))) {

            retry = true;

            Row row = list.get(i);
            workingList.add(row);
            deleteCachedLocation(tableName, row.getRow());
          } else {
            // add null to workingList, so the order remains consistent with the original list argument.
            workingList.add(null);
          }
        }
由以上四步可以看出,重点在于第二步,继续跟进,看Callable是怎么样call的,有两步,一是创建到RegionServer的连接,二是调用RegionServer上的multi方法,显然这是远程调用的过程。
3.HConnectionManager.HConnectionImplementation.processBatch
      return new Callable<MultiResponse>() {
        public MultiResponse call() throws IOException {
          return getRegionServerWithoutRetries(
              new ServerCallable<MultiResponse>(connection, tableName, null) {
                public MultiResponse call() throws IOException {
                  return server.multi(multi);//第二步:远程调用服务端RegionServer的multi方法,返回结果
                }
                @Override
                public void instantiateServer(boolean reload) throws IOException {
                  server = connection.getHRegionConnection(address);//第一步:根据RegionServer的地址连上RegionServer
                }
              }
          );
        }
      };

RegionServer服务端
上面客户端调用过程分析完毕,继续跟RegionServer服务端的处理,入口方法就是HRegionServer.multi

1.HRegionServer.multi
这个方法里有些是关于重试、上锁、结果收集的代码,忽略掉,重要的是两步
第一步:根据RegionName取得对应的Region
          HRegion region = getRegion(regionName);
第二步:调用region的put方法实际put数据
          OperationStatusCode[] codes =
              region.put(putsWithLocks.toArray(new Pair[]{}));

2.HRegion.put
这个方法先检查、上锁、doMiniBatchPut、解锁、判断是否需要flush,重要的是doMiniBatchPut这个方法
long addedSize = doMiniBatchPut(batchOp);

3.HRegion.doMiniBatchPut
这个方法分为上锁、更新时间戳、写WAL、写memstore、解锁;重要的是写WAL和写memstore
      // ------------------------------------
      // STEP 3. Write to WAL
      // ----------------------------------
      WALEdit walEdit = new WALEdit();//第一步:创建WAL日志对象放内存里
      for (int i = firstIndex; i < lastIndexExclusive; i++) {
        // Skip puts that were determined to be invalid during preprocessing
        if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;

        Put p = batchOp.operations[i].getFirst();
        if (!p.getWriteToWAL()) continue;
        addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);//第二步:写put的内容到WAL日志对象里
      }

      // Append the edit to WAL
      this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
          walEdit, now);//第三步:写WAL日志对象到硬盘上

      // ------------------------------------
      // STEP 4. Write back to memstore
      // ----------------------------------
      long addedSize = 0;
      for (int i = firstIndex; i < lastIndexExclusive; i++) {
        if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;

        Put p = batchOp.operations[i].getFirst();
        addedSize += applyFamilyMapToMemstore(p.getFamilyMap());//把put放到memstore里
        batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
      }
分享到:
评论

相关推荐

    hbase权威指南源代码下载

    在本文中,我们将围绕HBase的核心概念、架构以及如何通过源代码学习进行深入探讨。 HBase,作为Apache Hadoop生态系统的一部分,是一个基于列族的分布式数据库,特别适合处理海量结构化数据。它的设计目标是在廉价...

    hbase0.94java源代码

    这个源代码包是针对Java开发者的宝贵资源,可以帮助他们深入理解HBase的内部工作原理,优化自己的应用,以及对HBase进行定制化开发。 HBase的核心概念包括: 1. 表(Table):HBase的数据存储在表中,每个表由行键...

    VC代码 hbase1.0 (实用代码源).rar

    VC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 hbase1.0 (实用代码源).rarVC代码 ...

    hbase权威指南.源代码

    《HBase权威指南》是一本深入探讨分布式大数据存储系统HBase的专业书籍,其源代码的提供为读者提供了更直观的学习材料。HBase是基于Apache Hadoop的非关系型数据库(NoSQL),它在大规模数据存储方面表现卓越,尤其...

    hbase权威指南源代码

    通过阅读《HBase权威指南》并结合源代码,开发者能够更好地掌握HBase的核心概念和技术,从而在实际项目中有效地运用HBase解决大数据问题。这些源代码实例对于学习HBase的开发、调试和运维都具有很高的参考价值。

    Hbase权威指南 随书源代码 源码包 绝对完整版

    Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。

    hbase权威指南 源代码 英文

    通过阅读《HBase 权威指南》的源代码,开发者不仅可以理解HBase的设计理念,还能学习到如何利用HBase解决实际问题,提升大数据处理的能力。无论是初学者还是经验丰富的开发者,这本书都将是提升HBase技能的宝贵资源...

    HBase源代码 hbase-0.98.23

    《深入剖析HBase源代码:hbase-0.98.23》 HBase,作为Apache的一个开源项目,是构建在Hadoop之上的分布式、版本化、列族式的NoSQL数据库,它提供了高可靠性、高性能、可伸缩的数据存储解决方案。本文将基于hbase-...

    将hdfs上的文件导入hbase的源代码

    通过阅读和理解这段代码,可以深入学习如何在实际项目中将HDFS数据导入HBase,这对于大数据平台的开发和运维人员来说是非常有价值的实践。 总的来说,将HDFS上的文件导入HBase是一个涉及数据处理、数据库设计和编程...

    hbase操作必备客户端源代码

    hbase操作必备客户端源代码

    HBase基本操作 Java代码

    HBase基本操作 增删改查 java代码 要使用须导入对应的jar包

    hbase权威指南源码

    解压后的`hbase-book-master`包含项目的基本目录结构,如`src/main/java`用于存放Java源代码,`src/main/resources`存储资源配置文件,`pom.xml`是Maven项目对象模型,定义了项目的构建过程和依赖关系。 2. **...

    hbase权威指南 配套源码

    《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其配套源码提供了书中所提及的示例代码和实践案例,方便读者更好地理解和应用HBase。以下将详细解析HBase的相关知识点。 HBase是建立在Apache ...

    java操作Hbase之Hbase专用过滤器PageFilter的使用源代码

    在Java中操作HBase数据库时,我们经常需要对大量数据进行高效的检索和处理。...提供的源代码应该包含如何创建、应用PageFilter以及处理扫描结果的完整示例,这对于学习和实践HBase的分页查询非常有帮助。

    使用Java API连接虚拟机HBase并进行数据库操作,Java源代码

    在Java代码中,首先需要导入必要的库,如`org.apache.hadoop.hbase.HBaseConfiguration`和`org.apache.hadoop.hbase.client.Connection`。然后,我们需要创建一个HBase的配置对象,设置Zookeeper的地址,这是HBase...

    读书笔记:hbase权威指南学习代码.zip

    读书笔记:hbase权威指南学习代码

    java代码将mysql表数据导入HBase表

    本文将详细介绍如何使用Java代码实现这一过程,包括样例MySQL表和数据,以及HBase表的创建。 首先,我们需要了解MySQL和HBase的基本概念。MySQL是一种关系型数据库管理系统,它基于ACID(原子性、一致性、隔离性和...

    WordCount,HBase MR样例代码

    标题中的“WordCount”是...在实际的学习过程中,理解并实践这些知识点对于掌握Hadoop和HBase的集成使用是非常有帮助的。通过阅读和研究相关的博客文章,可以加深对大数据处理和NoSQL数据库应用的理解,提升技能水平。

    hbase 的java代码 集合 hbase 0.96

    1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置对象,然后可以设置各种配置参数,如Zookeeper的地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)...

    HbaseTemplate 操作hbase

    5. **其他操作**:除了上述方法,HbaseTemplate还提供了插入(put)、删除(delete)和更新(update)等操作。例如,`put`方法用于向表中写入新的数据,`delete`方法根据行键删除一行,`update`方法则可以更新已有...

Global site tag (gtag.js) - Google Analytics