`

hbase源码分析(一):客户端数据入库

 
阅读更多

 

Hbase插入数据的过程大致是:

  1. 客户端提交请求给region server(这中间会有作一些缓存)
  2. region server接收到请求,判断如果是put请求,将其put到memstore
  3. 每次memstore的操作,都会检查memstore是否操作一个阈值,如果超过,就开始执行flush(),这个flush其实就是从内存中的KeyValue对持久化到HStore(也就是HFile)上面

下面我们来看一条数据时怎么进入到hbase的吧:

客户端:

 

  • HTable.java 执行put操作
  public void put(final Put put) throws IOException {
    doPut(Arrays.asList(put));
  }
  •  在put方法里执行doPut操作

验证put的合法性,然后检查keyvalue的大小是否越界,这个值可以如过配置i参数hbase.client.keyvalue.maxsize参数来配置,默认这个值是无限大的,然后调用writeBuffer.add(put);将数据写入到本地缓存,当数据超过本地缓存writeBufferSize(默认是2097152)的大小或者设置了自动提交autoFlush (默认是打开的尾true)或者你手动调用了flushCommits()操作,这些缓存将被flush

  private void doPut(final List<Put> puts) throws IOException {
    int n = 0;
    for (Put put : puts) {
      validatePut(put);
      writeBuffer.add(put);//将数据写入到本地缓存
      currentWriteBufferSize += put.heapSize();
     
      // we need to periodically see if the writebuffer is full instead of waiting until the end of the List
      n++;
      if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {
        flushCommits();
      }
    }
    if (autoFlush || currentWriteBufferSize > writeBufferSize) {
      flushCommits();
    }
  }

 

  • flushCommits()操作代码:
  public void flushCommits() throws IOException {
    try {
      Object[] results = new Object[writeBuffer.size()];
      try {
    	//在这里连接远程的region server提交请求
        this.connection.processBatch(writeBuffer, tableName, pool, results);
      } catch (InterruptedException e) {
        throw new IOException(e);
      } finally {
        // mutate list so that it is empty for complete success, or contains
        // only failed records results are returned in the same order as the
        // requests in list walk the list backwards, so we can remove from list
        // without impacting the indexes of earlier members
        for (int i = results.length - 1; i>=0; i--) {
          if (results[i] instanceof Result) {
            // successful Puts are removed from the list here.
            writeBuffer.remove(i);
          }
        }
      }
    } finally {
      if (clearBufferOnFail) {
        writeBuffer.clear();
        currentWriteBufferSize = 0;
      } else {
        // the write buffer was adjusted by processBatchOfPuts
        currentWriteBufferSize = 0;
        for (Put aPut : writeBuffer) {
          currentWriteBufferSize += aPut.heapSize();
        }
      }
    }
  }

我们来看看HConnection.java的实现类HConnectionImplementation是怎么实现processBatch操作的:

 

    public void processBatch(List<? extends Row> list,
        final byte[] tableName,
        ExecutorService pool,
        Object[] results) throws IOException, InterruptedException {

      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException("argument results must be the same size as argument list");
      }

      processBatchCallback(list, tableName, pool, results, null);
    }
    public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException {

      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException(
            "argument results must be the same size as argument list");
      }
      if (list.isEmpty()) {
        return;
      }

      // Keep track of the most recent servers for any given item for better
      // exceptional reporting.  We keep HRegionLocation to save on parsing.
      // Later below when we use lastServers, we'll pull what we need from
      // lastServers.
      HRegionLocation [] lastServers = new HRegionLocation[results.length];
      List<Row> workingList = new ArrayList<Row>(list);
      boolean retry = true;
      // count that helps presize actions array
      int actionCount = 0;
      Throwable singleRowCause = null;

      for (int tries = 0; tries < numRetries && retry; ++tries) {

        // sleep first, if this is a retry
        if (tries >= 1) {
          long sleepTime = getPauseTime(tries);
          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
          Thread.sleep(sleepTime);
        }
        // step 1:分解为regionserver-sized块并构建数据结构
        Map<HRegionLocation, MultiAction<R>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<R>>();
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
            byte[] regionName = loc.getRegionInfo().getRegionName();

            MultiAction<R> actions = actionsByServer.get(loc);
            if (actions == null) {
              actions = new MultiAction<R>();
              actionsByServer.put(loc, actions);
            }

            Action<R> action = new Action<R>(row, i);
            lastServers[i] = loc;
            actions.add(regionName, action);
          }
        }

        // step 2: 发出请求

        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());

        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          //异步的处理数据写入请求
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }

        // step 3: 手机失败和成功的信息,并准备对失败的进行重新写入

        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
             : futures.entrySet()) {
          HRegionLocation loc = responsePerServer.getKey();

          try {
            Future<MultiResponse> future = responsePerServer.getValue();
            MultiResponse resp = future.get();

            if (resp == null) {
              // Entire server failed
              LOG.debug("Failed all for server: " + loc.getHostnamePort() +
                ", removing from cache");
              continue;
            }

            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
              byte[] regionName = e.getKey();
              List<Pair<Integer, Object>> regionResults = e.getValue();
              for (Pair<Integer, Object> regionResult : regionResults) {
                if (regionResult == null) {
                  // if the first/only record is 'null' the entire region failed.
                  LOG.debug("Failures for region: " +
                      Bytes.toStringBinary(regionName) +
                      ", removing from cache");
                } else {
                  // Result might be an Exception, including DNRIOE
                  results[regionResult.getFirst()] = regionResult.getSecond();
                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
                    callback.update(e.getKey(),
                        list.get(regionResult.getFirst()).getRow(),
                        (R)regionResult.getSecond());
                  }
                }
              }
            }
          } catch (ExecutionException e) {
            LOG.warn("Failed all from " + loc, e);
          }
        }

        // step 4: 识别失败的数据,并准备去重试写入

        // Find failures (i.e. null Result), and add them to the workingList (in
        // order), so they can be retried.
        retry = false;
        workingList.clear();
        actionCount = 0;
        for (int i = 0; i < results.length; i++) {
          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
          // then retry that row. else dont.
          if (results[i] == null ||
              (results[i] instanceof Throwable &&
                  !(results[i] instanceof DoNotRetryIOException))) {

            retry = true;
            actionCount++;
            Row row = list.get(i);
            workingList.add(row);
            deleteCachedLocation(tableName, row.getRow());
          } else {
            if (results[i] != null && results[i] instanceof Throwable) {
              actionCount++;
            }
            // add null to workingList, so the order remains consistent with the original list argument.
            workingList.add(null);
          }
        }
      }

      if (retry) {
        // Simple little check for 1 item failures.
        if (singleRowCause != null) {
          throw new IOException(singleRowCause);
        }
      }


      List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
      List<Row> actions = new ArrayList<Row>(actionCount);
      List<String> addresses = new ArrayList<String>(actionCount);

      for (int i = 0 ; i < results.length; i++) {
        if (results[i] == null || results[i] instanceof Throwable) {
          exceptions.add((Throwable)results[i]);
          actions.add(list.get(i));
          addresses.add(lastServers[i].getHostnamePort());
        }
      }

      if (!exceptions.isEmpty()) {
        throw new RetriesExhaustedWithDetailsException(exceptions,
            actions,
            addresses);
      }
    } 

 

通过RPC向Region Server提交数据,

 

    private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
        final MultiAction<R> multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable<MultiResponse>() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable<MultiResponse>(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

 

 

获取RPC实例的操作:

 

 HRegionInterface getHRegionConnection(final String hostname, final int port,
        final InetSocketAddress isa, final boolean master)
    throws IOException {
      if (master) getMaster();
      HRegionInterface server;
      String rsName = null;
      if (isa != null) {
        rsName = Addressing.createHostAndPortStr(isa.getHostName(),
            isa.getPort());
      } else {
        rsName = Addressing.createHostAndPortStr(hostname, port);
      }
      // See if we already have a connection (common case)
      server = this.servers.get(rsName);
      if (server == null) {
        // create a unique lock for this RS (if necessary)
        this.connectionLock.putIfAbsent(rsName, rsName);
        // get the RS lock
        synchronized (this.connectionLock.get(rsName)) {
          // do one more lookup in case we were stalled above
          server = this.servers.get(rsName);
          if (server == null) {
            try {
              if (clusterId.hasId()) {
                conf.set(HConstants.CLUSTER_ID, clusterId.getId());
              }
              // Only create isa when we need to.
              InetSocketAddress address = isa != null? isa:
                new InetSocketAddress(hostname, port);
              // definitely a cache miss. establish an RPC for this RS
              server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
              this.servers.put(Addressing.createHostAndPortStr(
                  address.getHostName(), address.getPort()), server);
            } catch (RemoteException e) {
              LOG.warn("RemoteException connecting to RS", e);
              // Throw what the RemoteException was carrying.
              throw e.unwrapRemoteException();
            }
          }
        }
      }
      return server;
    }

 

  • hbase client在执行插入的时候,会对最近使用的region server做缓存,如果缓存中保存了相应的region server信息,就直接使用这个region信息,连接这个region server,否则会对master进行一次rpc操作,获得region server信息,客户端的操作put、get、delete等操作每次都是封装在一个Action对象中进行提交操作的,都是一系列的的action一起提交,这就是MultiAction
Server端操作:
 
客户端通过RPC提交过来的操作会进入到HRegionServer.multi(MultiAction<R> multi)中处理插入请求。
  • 出去每一个action对象,判断属于哪一个实例(put/get/delete),来执行相应的操作
  • 给每个put分配一个lock
  • 执行HRgion.put,进行数据写入操作
HRegionServer.java
  @SuppressWarnings("unchecked")
  @Override
  public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {
    checkOpen();
    MultiResponse response = new MultiResponse();
    for (Map.Entry<byte[], List<Action<R>>> e : multi.actions.entrySet()) {
      byte[] regionName = e.getKey();
      List<Action<R>> actionsForRegion = e.getValue();
      // sort based on the row id - this helps in the case where we reach the
      // end of a region, so that we don't have to try the rest of the
      // actions in the list.
      Collections.sort(actionsForRegion);
      Row action;
      List<Action<R>> puts = new ArrayList<Action<R>>();
      for (Action<R> a : actionsForRegion) {
        action = a.getAction();
        int originalIndex = a.getOriginalIndex();

        try {
          //判断action是哪种操作
          if (action instanceof Delete) {
            delete(regionName, (Delete) action);
            response.add(regionName, originalIndex, new Result());
          } else if (action instanceof Get) {
            response.add(regionName, originalIndex, get(regionName, (Get) action));
          } else if (action instanceof Put) {
            puts.add(a);  // wont throw.
          } else if (action instanceof Exec) {
            ExecResult result = execCoprocessor(regionName, (Exec)action);
            response.add(regionName, new Pair<Integer, Object>(
                a.getOriginalIndex(), result.getValue()
            ));
          } else {
            LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
                "Put or Exec.");
            throw new DoNotRetryIOException("Invalid Action, row must be a " +
                "Get, Delete or Put.");
          }
        } catch (IOException ex) {
          response.add(regionName, originalIndex, ex);
        }
      }

      // We do the puts with result.put so we can get the batching efficiency
      // we so need. All this data munging doesn't seem great, but at least
      // we arent copying bytes or anything.
      if (!puts.isEmpty()) {
        try {
          HRegion region = getRegion(regionName);

          if (!region.getRegionInfo().isMetaTable()) {
            this.cacheFlusher.reclaimMemStoreMemory();
          }

          List<Pair<Put,Integer>> putsWithLocks =
              Lists.newArrayListWithCapacity(puts.size());
          for (Action<R> a : puts) {
            Put p = (Put) a.getAction();

            Integer lock;
            try {
              //获取lock
              lock = getLockFromId(p.getLockId());
            } catch (UnknownRowLockException ex) {
              response.add(regionName, a.getOriginalIndex(), ex);
              continue;
            }
            putsWithLocks.add(new Pair<Put, Integer>(p, lock));
          }

          this.requestCount.addAndGet(puts.size());
          //调用将数据写入到region中
          OperationStatus[] codes =
              region.put(putsWithLocks.toArray(new Pair[]{}));

          for( int i = 0 ; i < codes.length ; i++) {
            OperationStatus code = codes[i];

            Action<R> theAction = puts.get(i);
            Object result = null;

            if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
              result = new Result();
            } else if (code.getOperationStatusCode()
                == OperationStatusCode.BAD_FAMILY) {
              result = new NoSuchColumnFamilyException(code.getExceptionMsg());
            }
            // FAILURE && NOT_RUN becomes null, aka: need to run again.

            response.add(regionName, theAction.getOriginalIndex(), result);
          }
        } catch (IOException ioe) {
          // fail all the puts with the ioe in question.
          for (Action<R> a: puts) {
            response.add(regionName, a.getOriginalIndex(), ioe);
          }
        }
      }
    }
    return response;
  }
 
HRegion.java的put操作:
  /**
   * @param put
   * @param lockid
   * @param writeToWAL
   * @throws IOException
   */
  public void put(Put put, Integer lockid, boolean writeToWAL)
  throws IOException {
	//检查region是否只读,如果只读,就会抛出异常
    checkReadOnly();

    // Do a rough check that we have resources to accept a write.  The check is
    // 'rough' in that between the resource check and the call to obtain a
    // read lock, resources may run out.  For now, the thought is that this
    // will be extremely rare; we'll deal with it when it happens.
    checkResources();
    //获取的lock
    startRegionOperation();
    this.writeRequestsCount.increment();
    try {
      // We obtain a per-row lock, so other clients will block while one client
      // performs an update. The read lock is released by the client calling
      // #commit or #abort or if the HRegionServer lease on the lock expires.
      // See HRegionServer#RegionListener for how the expire on HRegionServer
      // invokes a HRegion#abort.
      byte [] row = put.getRow();
      // If we did not pass an existing row lock, obtain a new one
      Integer lid = getLock(lockid, row, true);

      try {
        // All edits for the given row (across all column families) must happen atomically.
        internalPut(put, put.getClusterId(), writeToWAL);
      } finally {
        if(lockid == null) releaseRowLock(lid);
      }
    } finally {
      closeRegionOperation();
    }
  }
 

checkResource()操作:

在实际执行put执行,先要进行必要的检查操作,我们来看看checkResource()方法。

 

  private void checkResources() {

    // If catalog region, do not impose resource constraints or block updates.
    if (this.getRegionInfo().isMetaRegion()) return;

    boolean blocked = false;
    while (this.memstoreSize.get() > this.blockingMemStoreSize) {
      requestFlush();
      if (!blocked) {
        LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
          "' on region " + Bytes.toStringBinary(getRegionName()) +
          ": memstore size " +
          StringUtils.humanReadableInt(this.memstoreSize.get()) +
          " is >= than blocking " +
          StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
      }
      blocked = true;
      synchronized(this) {
        try {
          wait(threadWakeFrequency);
        } catch (InterruptedException e) {
          // continue;
        }
      }
    }
    if (blocked) {
      LOG.info("Unblocking updates for region " + this + " '"
          + Thread.currentThread().getName() + "'");
    }
  }

 可以看出当Hregion的Memstore总大小超过blockingMemStoreSize,则会进入flush操作,线程会进入到阻塞状态,直到memstoresize的值降到合适的范围内。

 

 

internalPut这个操作包括:

 

  • checkFamilies 检查列族
  • updateKVTimestamps 更新KeyValue的时间戳
  • addFamilyMapToWALEdit 预写日志
  • applyFamilyMapToMemstore 将数据写入到memstore中
  • isFlushSize 判断是否将文件flush到HFile中
  • 释放锁
  • 将memstore的数据flush到HFile中

本文仅是个人理解,有什么不正确的地方肯定指正

 

分享到:
评论

相关推荐

    hbase用于查询客户端工具

    HBase是一种分布式、基于列族的NoSQL数据库,它在大数据领域中扮演着重要的角色,尤其是在需要实时查询大规模数据集时。HBase以其高吞吐量、低延迟和水平扩展能力而闻名,常用于存储非结构化和半结构化数据。在HBase...

    HBase源码分析

    总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...

    hbase源码带中文注释

    通过分析源码,可以了解到HBase如何进行远程调用和数据序列化。 7. **HBase Prefix Tree**: 这是一个优化HBase查询性能的模块,利用前缀树数据结构来加速范围查询。源码分析有助于提升对空间和时间复杂度的理解。 ...

    Hbase可视化客户端.rar

    在实际应用中,HBase可视化客户端可以帮助数据分析师快速查询数据,便于运维人员监控集群状态,同时也能简化开发人员对HBase的操作,提高工作效率。不过,需要注意的是,虽然可视化工具降低了操作难度,但对HBase的...

    hbase的表结构及客户端依赖.zip

    HBase是一种分布式、基于列族的NoSQL数据库,它运行在Hadoop之上,适用于处理海量半结构化数据。HBase的表结构设计和客户端依赖是理解其工作原理的关键部分。 一、HBase的表结构 1. 表:在HBase中,数据以表格的...

    HBaseClient:HBase客户端数据管理软件

    类似PL/SQL,是一个HBase数据库的客户端数据管理软件。是免费开源的软件。 基于XJava,使用xml配置文件绘制可视化界面。 可视化界面操作 表 表的定义、编辑、删除; 数据 数据的添加、编辑、删除; 数据的全部清空、...

    Hbase 可视化客户端工具(非phoenix连接)

    本文将详细介绍一个专为HBase设计的可视化客户端工具,该工具不依赖于Phoenix连接,而是直接与HBase服务器通信,支持Hbase 1.x版本,提供类似于PL/SQL的友好界面,使得数据管理和查询变得更加便捷。 首先,这个可视...

    hbase客户端连接工具winutils-2.2.0.zip

    HBase是一款分布式的、面向列的开源数据库,它是Apache Hadoop生态系统的一部分,专门设计用于处理大规模数据。在Java客户端上连接HBase集群时,需要配置一系列的环境和依赖,其中包括了`winutils`工具。`winutils-...

    HBase源码分析与开发实战

    HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解

    HBase学习利器:HBase实战

    HBase是Apache Hadoop生态系统中的一个分布式、可扩展的列族数据库,它提供了类似Bigtable的能力,能够在大规模数据集上进行随机读写操作。HBase是基于Hadoop Distributed File System (HDFS)构建的,能够处理PB级别...

    Hbase权威指南(HBase: The Definitive Guide)

    - **实时数据分析**:由于HBase提供了低延迟的随机读写能力,因此非常适合于需要实时处理数据的应用场景。 - **高并发读写**:HBase能够支持高并发的读写操作,特别适用于需要频繁更新数据的应用场景。 #### 六、...

    hbase源码分析

    ### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...

    HBase的Java客户端源码

    HBASE的java版本的客户端,运行代码需要设定环境变量且打包成jar文件运行

    HBASE 可视化客户端(非phenix)

    类似PL/SQL,是一个HBase数据库的客户端数据管理软件。 1: put支持中文 2: 支持文件形式的批量put命令执行 3: 支持扫描目录下所有文件的批量put命令执行 4: 支持put命令字符的执行 5: 支持文件编码自动识别 6: 支持...

    hbase 源码包

    HBase 0.94.4的源码分析有助于我们深入了解其内部机制,从而更好地进行系统设计和优化。无论是对于开发者还是管理员,掌握HBase的核心原理都将极大地提升在大数据领域的实践能力。通过不断学习和实践,我们可以更好...

    HBase-SparkStreaming:从HBase表读取并写入HBase表的简单Spark Streaming项目

    创建一个要写入的hbase表:a)启动hbase shell $ hbase shell b)创建表create'/ user / chanumolu / sensor',{NAME =&gt;'data'},{NAME =&gt;'alert'},{ NAME =&gt;'stats'} #执行: 第1步:MVN全新安装 步骤2:启动流...

    HBase实战源码

    源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...

    java操作Hbase之从Hbase中读取数据写入hdfs中源码

    在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...

Global site tag (gtag.js) - Google Analytics