`

hbase put源码分析

阅读更多

 

这里写下HRegionServer在做put操作的源码:

 

HRegionServer 

 

  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
  throws ServiceException {
  ....
  try {
        region = getRegion(regionAction.getRegion());//获得对应操作的Region
      } catch (IOException e) {
        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
        continue;  // For this region it's a failure.
      }
  ....
   if (regionAction.hasAtomic() && regionAction.getAtomic()) {//是否需要原子操作
        // How does this call happen?  It may need some work to play well w/ the surroundings.
        // Need to return an item per Action along w/ Action index.  TODO.
        try {
          mutateRows(region, regionAction.getActionList(), cellScanner);//以下详细介绍
        } catch (IOException e) {
          // As it's atomic, we may expect it's a global failure.
          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        }
      } else {
        // doNonAtomicRegionMutation manages the exception internally
        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
            regionActionResultBuilder, cellsToReturn, nonceGroup);
      }
  ....
  }

 HRegionServer的mutateRows方法:

 

 

  protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
      final CellScanner cellScanner)
  throws IOException {
    if (!region.getRegionInfo().isMetaTable()) {
      cacheFlusher.reclaimMemStoreMemory();
    }
    RowMutations rm = null;
    for (ClientProtos.Action action: actions) {
      if (action.hasGet()) {
        throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
          action.getGet());
      }
      MutationType type = action.getMutation().getMutateType();
      if (rm == null) {
        rm = new RowMutations(action.getMutation().getRow().toByteArray());
      }
      switch (type) {
      case PUT:
        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
        break;
      case DELETE:
        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
        break;
      default:
          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
      }
    }
    region.mutateRow(rm);//HRegion执行操作
  }
  

 调用HRegion

 

 

    public void mutateRow(RowMutations rm) throws IOException {
    // Don't need nonces here - RowMutations only supports puts and deletes
    mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
  }

 HRegion的mutateRow详细过程:

 

  1.执行操作前的coprocessorHost的操作

   MultiRowMutationProcessor类

  2.获取rowlock(rowid)

  3.尝试获取HRegion(updatesLock) read lock

  4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象

  MultiRowMutationProcessor类

  5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中

  6.添加到对应HStore里的memstore中,(需要store的读锁)

  7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)

  FSHlog

  8.释放HRegion(updatesLock) read lock

  9.释放rowlock(rowid) latchdown--

  10.同步这个hregionserver上的hlog到当前的hlog,

  使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid

  FSHlog类

  11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)

  12.MultiRowMutationProcessor执行postProcess

  执行协同类

 

以下是带源码的分析过程:

1.执行操作前的coprocessorHost的操作

   MultiRowMutationProcessor类

2.获取rowlock(rowid)

 

  /**
   * Tries to acquire a lock on the given row.
   * @param waitForLock if true, will block until the lock is available.
   *        Otherwise, just tries to obtain the lock and returns
   *        false if unavailable.
   * @return the row lock if acquired,
   *   null if waitForLock was false and the lock was not acquired
   * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
   */
  public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
    checkRow(row, "row lock");
    startRegionOperation();
    try {
      HashedBytes rowKey = new HashedBytes(row);
      RowLockContext rowLockContext = new RowLockContext(rowKey);

      // loop until we acquire the row lock (unless !waitForLock)
      while (true) {//循环获取此rowid
        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);//尝试获取rowid
        if (existingContext == null) {//没有其他获得此rowid
          // Row is not already locked by any thread, use newly created context.
          break;
        } else if (existingContext.ownedByCurrentThread()) {//当前进行已经获取此rowid
          // Row is already locked by current thread, reuse existing context instead.
          rowLockContext = existingContext;
          break;
        } else {//其他进行获得rowid
          // Row is already locked by some other thread, give up or wait for it
          if (!waitForLock) {
            return null;
          }
          try {
            if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {//使用CountDownLatch等待其他进行释放此rowid
              throw new IOException("Timed out waiting for lock for row: " + rowKey);
            }
          } catch (InterruptedException ie) {
            LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
            InterruptedIOException iie = new InterruptedIOException();
            iie.initCause(ie);
            throw iie;
          }
        }
      }

      // allocate new lock for this thread
      return rowLockContext.newLock();
    } finally {
      closeRegionOperation();
    }
  }

 

3.尝试获取HRegion(updatesLock) read lock

  

  

  4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象

  MultiRowMutationProcessor类 

 

   @Override
  public void process(long now,
                      HRegion region,
                      List<KeyValue> mutationKvs,
                      WALEdit walEdit) throws IOException {
    byte[] byteNow = Bytes.toBytes(now);
    // Check mutations and apply edits to a single WALEdit
    for (Mutation m : mutations) {//对每个row进行检查,更新timestamp
      if (m instanceof Put) {
        Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
        region.checkFamilies(familyMap.keySet());
        region.checkTimestamps(familyMap, now);
        region.updateKVTimestamps(familyMap.values(), byteNow);
      } else if (m instanceof Delete) {
        Delete d = (Delete) m;
        region.prepareDelete(d);
        region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow);
      } else {
        throw new DoNotRetryIOException(
            "Action must be Put or Delete. But was: "
            + m.getClass().getName());
      }
      for (List<Cell> cells: m.getFamilyCellMap().values()) {
        boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
        for (Cell cell : cells) {//对每个cf的row,写入walEdit对象
          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
          mutationKvs.add(kv);
          if (writeToWAL) {
            walEdit.add(kv);
          }
        }
      }
    }
  }

 

 

5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中

  writeEntry = mvcc.beginMemstoreInsert();

 

  /**
   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
   */
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;//使用当前HRegion的mvcc的writepoint++作为logedit的number
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
  }

   6.添加到对应HStore里的memstore中,(需要store的读锁)

 

   @Override
  public long add(final KeyValue kv) {
    lock.readLock().lock();
    try {
      return this.memstore.add(kv);
    } finally {
      lock.readLock().unlock();
    }
  }

   7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)

  FSHlog

  /**
   * Append a set of edits to the log. Log edits are keyed by (encoded)
   * regionName, rowname, and log-sequence-id.
   *
   * Later, if we sort by these keys, we obtain all the relevant edits for a
   * given key-range of the HRegion (TODO). Any edits that do not have a
   * matching COMPLETE_CACHEFLUSH message can be discarded.
   *
   * <p>
   * Logs cannot be restarted once closed, or once the HLog process dies. Each
   * time the HLog starts, it must create a new log. This means that other
   * systems should process the log appropriately upon each startup (and prior
   * to initializing HLog).
   *
   * synchronized prevents appends during the completion of a cache flush or for
   * the duration of a log roll.
   *
   * @param info
   * @param tableName
   * @param edits
   * @param clusterIds that have consumed the change (for replication)
   * @param now
   * @param doSync shall we sync?
   * @param sequenceId of the region.
   * @return txid of this transaction
   * @throws IOException
   */
  @SuppressWarnings("deprecation")
  private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, 
      AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
      if (edits.isEmpty()) return this.unflushedEntries.get();
      if (this.closed) {
        throw new IOException("Cannot append; log is closed");
      }
      TraceScope traceScope = Trace.startSpan("FSHlog.append");
      try {
        long txid = 0;
        synchronized (this.updateLock) {//同步hlog的更新锁
          // get the sequence number from the passed Long. In normal flow, it is coming from the
          // region.
          long seqNum = sequenceId.incrementAndGet();
          //生成region的seqnum,用于当memstore flush了,则mem的seqnum>=hlog的seqnum,则删除这些hlog
          // The 'lastSeqWritten' map holds the sequence number of the oldest
          // write for each region (i.e. the first edit added to the particular
          // memstore). . When the cache is flushed, the entry for the
          // region being flushed is removed if the sequence number of the flush
          // is greater than or equal to the value in lastSeqWritten.
          // Use encoded name.  Its shorter, guaranteed unique and a subset of
          // actual  name.
          byte [] encodedRegionName = info.getEncodedNameAsBytes();//regioncode
          if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);//没有这个region,则添加(存储着这个region最早的修改seqnum)
          HLogKey logKey = makeKey(
            encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);

          synchronized (pendingWritesLock) {
            doWrite(info, logKey, edits, htd);//将logedit写入队列
            txid = this.unflushedEntries.incrementAndGet();
          }
          this.numEntries.incrementAndGet();
          this.asyncWriter.setPendingTxid(txid);

          if (htd.isDeferredLogFlush()) {
            lastUnSyncedTxid = txid;
          }
          this.latestSequenceNums.put(encodedRegionName, seqNum);
        }
        // TODO: note that only tests currently call append w/sync.
        //       Therefore, this code here is not actually used by anything.
        // Sync if catalog region, and if not then check if that table supports
        // deferred log flushing
        if (doSync &&
            (info.isMetaRegion() ||
            !htd.isDeferredLogFlush())) {
          // sync txn to file system
          this.sync(txid);
        }
        return txid;
      } finally {
        traceScope.close();
      }
    }

 8.释放HRegion(updatesLock) read lock

  

  9.释放rowlock(rowid) latchdown--

  

  10.同步这个hregionserver上的hlog到当前的hlog,

  使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid

  FSHlog类

 // sync all transactions upto the specified txid
  private void syncer(long txid) throws IOException {
    synchronized (this.syncedTillHere) {
      while (this.syncedTillHere.get() < txid) {
        try {
          this.syncedTillHere.wait();

          if (txid <= this.failedTxid.get()) {
            assert asyncIOE != null :
              "current txid is among(under) failed txids, but asyncIOE is null!";
            throw asyncIOE;
          }
        } catch (InterruptedException e) {
          LOG.debug("interrupted while waiting for notification from AsyncNotifier");
        }
      }
    }
  }

 

  11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)

 /**
   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   *
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
   */
  public void completeMemstoreInsert(WriteEntry e) {
    advanceMemstore(e);
    waitForRead(e);
  }
  
  /**
   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   *
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   *
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
   */
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {
      e.markCompleted();

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {//将readpoint设置到最大的完成wal的writepoint值
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("never was a first");
      }

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
          readWaiters.notifyAll();
        }
      }
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      }
      return false;
    }
  }
  ----------------------------
  /**
   * Wait for the global readPoint to advance upto
   * the specified transaction number.
   */
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {//等待readpoint大于当前的writepoint
        try {
        	//(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
          readWaiters.wait(0);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
  }

 

  12.MultiRowMutationProcessor执行postProcess

  执行协同类

 

 

结束

 

 

 

 

 

 

 

 

 

 

 

 

1
0
分享到:
评论

相关推荐

    HBase源码分析

    总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...

    HBase实战源码

    源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...

    hbase-0.98.1源码包

    源码分析可以从以下几个方面入手: 1. 源码结构:了解项目目录结构,如src/main/java下的org.apache.hadoop.hbase目录,包含了所有主要模块的源代码。 2. 主要组件:深入研究RegionServer、MasterServer、Client等...

    hbase权威指南源码

    源码分析: 1. **目录结构**: 解压后的`hbase-book-master`包含项目的基本目录结构,如`src/main/java`用于存放Java源代码,`src/main/resources`存储资源配置文件,`pom.xml`是Maven项目对象模型,定义了项目的...

    Hbase1.3.1源码

    HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。

    HBaseTest_hbase_源码

    通过分析"HBaseTest"源码,我们可以更直观地理解HBase的工作原理和使用方式,为实际项目中的数据存储和处理提供有力的支持。在实际应用中,还需要根据具体需求进行性能调优,以充分利用HBase的优势,解决大数据场景...

    hbase-0.94.13 jar和源码

    通过分析源码,开发者可以深入理解分布式数据库的设计思想,而jar包则使开发者能够快速构建基于HBase的应用。不过需要注意的是,0.94.13已经是较旧的版本,最新的稳定版本可能会包含更多的功能和改进,因此在生产...

    hbase-0.98.6.1-src.zip

    HBase是Apache软件基金会开发的一个开源、分布式、版本化、基于列族的NoSQL数据库,设计用于处理海量数据。这个“hbase-0.98.6.1-...总之,HBase 0.98.6.1的源码分析是理解大数据处理和NoSQL数据库设计的宝贵学习材料。

    hbase-1.2.6-bin+src.tar.rar

    在Hadoop生态系统中,HBase提供了实时读写、强一致性的能力,是大数据分析的重要组件。本文将围绕"Hbase-1.2.6-bin+src.tar.rar"这一资源,深入探讨HBase的核心概念、架构、功能以及源码解析。 一、HBase概述 1.1 ...

    [原创]HbaseClient

    源码分析是理解HbaseClient工作原理的关键。通过阅读源码,我们可以发现HbaseClient在执行操作时,会先将请求序列化成protobuf消息,然后通过HBase的RPC协议发送到RegionServer。RegionServer接收到请求后,解析并...

    hbase-1.2.6-bin+src.zip

    在本文中,我们将深入探讨HBase 1.2.6版本的特性、架构、核心概念以及源码分析,旨在帮助读者全面了解这一强大的大数据存储系统。 一、HBase概述 HBase是为非结构化数据设计的列式存储数据库,适合处理大规模数据。...

    hbase-1.7.0-bin.tar.gz

    6. **src目录**:源代码文件,如果你需要查看或修改HBase的源码,这个目录是起点。 7. **LICENSE**和**NOTICE**文件:分别包含了HBase的许可证信息和版权声明。 在部署HBase之前,我们需要先确保已经安装了Java...

    hbase权威指南.源代码

    通过阅读源码,我们可以学习到如何设计高效的数据模型,如何实现数据的分布式存储和检索,以及如何优化HBase的性能。例如,`hbase-book.zip`可能包含了书中提到的各种示例程序,这些程序展示了如何使用HBase的Java ...

    Hadoop与HBase部署文档

    而HBase是一款基于Hadoop的分布式、高性能、列式存储的NoSQL数据库,适用于实时查询和分析大规模数据。 1. **简介** 在部署Hadoop和HBase时,我们需要构建一个可靠的分布式环境,确保数据的高可用性和容错性。...

    HBase SQL Phoenix

    2. 编译式查询:Phoenix将SQL查询编译成HBase的Get和Put操作,减少了中间解析步骤,提高了性能。 3. 分区和索引:Phoenix支持基于列的分区,以及创建二级索引,优化查询性能。 4. 动态Schema:Phoenix允许在运行时...

    Hbase实战

    《HBase实战》 在大数据领域,HBase作为一款分布式、高性能的NoSQL数据库,扮演着至关重要的角色。...通过实战应用和源码分析,我们可以更好地优化HBase在特定场景下的性能,解决大数据时代的挑战。

    Hbse工具包 包含hbase中各种增删改查

    同时,通过阅读和分析HBase源码,可以更深入地理解其工作原理,从而更好地优化应用性能。 总之,HBase工具包是连接HBase数据库和应用程序的重要桥梁,它通过简化常见的数据操作,让开发者能够专注于业务逻辑,而...

    hbase reginobserver

    RegionObserver可以被看作是HBase中的一个插件,它可以在数据存储、读取和更新的过程中注入自定义逻辑,而无需修改HBase的核心源码。这个特性使得HBase非常灵活,能够适应各种复杂的应用场景。 1. **RegionObserver...

Global site tag (gtag.js) - Google Analytics