这里写下HRegionServer在做put操作的源码:
HRegionServer
public MultiResponse multi(final RpcController rpcc, final MultiRequest request) throws ServiceException { .... try { region = getRegion(regionAction.getRegion());//获得对应操作的Region } catch (IOException e) { regionActionResultBuilder.setException(ResponseConverter.buildException(e)); responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); continue; // For this region it's a failure. } .... if (regionAction.hasAtomic() && regionAction.getAtomic()) {//是否需要原子操作 // How does this call happen? It may need some work to play well w/ the surroundings. // Need to return an item per Action along w/ Action index. TODO. try { mutateRows(region, regionAction.getActionList(), cellScanner);//以下详细介绍 } catch (IOException e) { // As it's atomic, we may expect it's a global failure. regionActionResultBuilder.setException(ResponseConverter.buildException(e)); } } else { // doNonAtomicRegionMutation manages the exception internally cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn, nonceGroup); } .... }
HRegionServer的mutateRows方法:
protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions, final CellScanner cellScanner) throws IOException { if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } RowMutations rm = null; for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } MutationType type = action.getMutation().getMutateType(); if (rm == null) { rm = new RowMutations(action.getMutation().getRow().toByteArray()); } switch (type) { case PUT: rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); break; case DELETE: rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); break; default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } region.mutateRow(rm);//HRegion执行操作 }
调用HRegion
public void mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); }
HRegion的mutateRow详细过程:
1.执行操作前的coprocessorHost的操作
MultiRowMutationProcessor类
2.获取rowlock(rowid)
3.尝试获取HRegion(updatesLock) read lock
4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象
MultiRowMutationProcessor类
5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中
6.添加到对应HStore里的memstore中,(需要store的读锁)
7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)
FSHlog
8.释放HRegion(updatesLock) read lock
9.释放rowlock(rowid) latchdown--
10.同步这个hregionserver上的hlog到当前的hlog,
使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid
FSHlog类
11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
12.MultiRowMutationProcessor执行postProcess
执行协同类
以下是带源码的分析过程:
1.执行操作前的coprocessorHost的操作
MultiRowMutationProcessor类
2.获取rowlock(rowid)
/** * Tries to acquire a lock on the given row. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * false if unavailable. * @return the row lock if acquired, * null if waitForLock was false and the lock was not acquired * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); RowLockContext rowLockContext = new RowLockContext(rowKey); // loop until we acquire the row lock (unless !waitForLock) while (true) {//循环获取此rowid RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);//尝试获取rowid if (existingContext == null) {//没有其他获得此rowid // Row is not already locked by any thread, use newly created context. break; } else if (existingContext.ownedByCurrentThread()) {//当前进行已经获取此rowid // Row is already locked by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else {//其他进行获得rowid // Row is already locked by some other thread, give up or wait for it if (!waitForLock) { return null; } try { if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {//使用CountDownLatch等待其他进行释放此rowid throw new IOException("Timed out waiting for lock for row: " + rowKey); } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; } } } // allocate new lock for this thread return rowLockContext.newLock(); } finally { closeRegionOperation(); } }
3.尝试获取HRegion(updatesLock) read lock
4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象
MultiRowMutationProcessor类
@Override public void process(long now, HRegion region, List<KeyValue> mutationKvs, WALEdit walEdit) throws IOException { byte[] byteNow = Bytes.toBytes(now); // Check mutations and apply edits to a single WALEdit for (Mutation m : mutations) {//对每个row进行检查,更新timestamp if (m instanceof Put) { Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap(); region.checkFamilies(familyMap.keySet()); region.checkTimestamps(familyMap, now); region.updateKVTimestamps(familyMap.values(), byteNow); } else if (m instanceof Delete) { Delete d = (Delete) m; region.prepareDelete(d); region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow); } else { throw new DoNotRetryIOException( "Action must be Put or Delete. But was: " + m.getClass().getName()); } for (List<Cell> cells: m.getFamilyCellMap().values()) { boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL; for (Cell cell : cells) {//对每个cf的row,写入walEdit对象 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); mutationKvs.add(kv); if (writeToWAL) { walEdit.add(kv); } } } } }
5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中
writeEntry = mvcc.beginMemstoreInsert();
/** * Generate and return a {@link WriteEntry} with a new write number. * To complete the WriteEntry and wait for it to be visible, * call {@link #completeMemstoreInsert(WriteEntry)}. */ public WriteEntry beginMemstoreInsert() { synchronized (writeQueue) { long nextWriteNumber = ++memstoreWrite;//使用当前HRegion的mvcc的writepoint++作为logedit的number WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } }
6.添加到对应HStore里的memstore中,(需要store的读锁)
@Override public long add(final KeyValue kv) { lock.readLock().lock(); try { return this.memstore.add(kv); } finally { lock.readLock().unlock(); } }
7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)
FSHlog
/** * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. * * Later, if we sort by these keys, we obtain all the relevant edits for a * given key-range of the HRegion (TODO). Any edits that do not have a * matching COMPLETE_CACHEFLUSH message can be discarded. * * <p> * Logs cannot be restarted once closed, or once the HLog process dies. Each * time the HLog starts, it must create a new log. This means that other * systems should process the log appropriately upon each startup (and prior * to initializing HLog). * * synchronized prevents appends during the completion of a cache flush or for * the duration of a log roll. * * @param info * @param tableName * @param edits * @param clusterIds that have consumed the change (for replication) * @param now * @param doSync shall we sync? * @param sequenceId of the region. * @return txid of this transaction * @throws IOException */ @SuppressWarnings("deprecation") private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds, final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } TraceScope traceScope = Trace.startSpan("FSHlog.append"); try { long txid = 0; synchronized (this.updateLock) {//同步hlog的更新锁 // get the sequence number from the passed Long. In normal flow, it is coming from the // region. long seqNum = sequenceId.incrementAndGet(); //生成region的seqnum,用于当memstore flush了,则mem的seqnum>=hlog的seqnum,则删除这些hlog // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes();//regioncode if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);//没有这个region,则添加(存储着这个region最早的修改seqnum) HLogKey logKey = makeKey( encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce); synchronized (pendingWritesLock) { doWrite(info, logKey, edits, htd);//将logedit写入队列 txid = this.unflushedEntries.incrementAndGet(); } this.numEntries.incrementAndGet(); this.asyncWriter.setPendingTxid(txid); if (htd.isDeferredLogFlush()) { lastUnSyncedTxid = txid; } this.latestSequenceNums.put(encodedRegionName, seqNum); } // TODO: note that only tests currently call append w/sync. // Therefore, this code here is not actually used by anything. // Sync if catalog region, and if not then check if that table supports // deferred log flushing if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system this.sync(txid); } return txid; } finally { traceScope.close(); } }
8.释放HRegion(updatesLock) read lock
9.释放rowlock(rowid) latchdown--
10.同步这个hregionserver上的hlog到当前的hlog,
使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid
FSHlog类
// sync all transactions upto the specified txid private void syncer(long txid) throws IOException { synchronized (this.syncedTillHere) { while (this.syncedTillHere.get() < txid) { try { this.syncedTillHere.wait(); if (txid <= this.failedTxid.get()) { assert asyncIOE != null : "current txid is among(under) failed txids, but asyncIOE is null!"; throw asyncIOE; } } catch (InterruptedException e) { LOG.debug("interrupted while waiting for notification from AsyncNotifier"); } } } }
11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
/** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. * * At the end of this call, the global read point is at least as large as the write point * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. */ public void completeMemstoreInsert(WriteEntry e) { advanceMemstore(e); waitForRead(e); } /** * Mark the {@link WriteEntry} as complete and advance the read point as * much as possible. * * How much is the read point advanced? * Let S be the set of all write numbers that are completed and where all previous write numbers * are also completed. Then, the read point is advanced to the supremum of S. * * @param e * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ boolean advanceMemstore(WriteEntry e) { synchronized (writeQueue) { e.markCompleted(); long nextReadValue = -1; boolean ranOnce=false; while (!writeQueue.isEmpty()) {//将readpoint设置到最大的完成wal的writepoint值 ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); if (nextReadValue > 0) { if (nextReadValue+1 != queueFirst.getWriteNumber()) { throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + nextReadValue + " next: " + queueFirst.getWriteNumber()); } } if (queueFirst.isCompleted()) { nextReadValue = queueFirst.getWriteNumber(); writeQueue.removeFirst(); } else { break; } } if (!ranOnce) { throw new RuntimeException("never was a first"); } if (nextReadValue > 0) { synchronized (readWaiters) { memstoreRead = nextReadValue; readWaiters.notifyAll(); } } if (memstoreRead >= e.getWriteNumber()) { return true; } return false; } } ---------------------------- /** * Wait for the global readPoint to advance upto * the specified transaction number. */ public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { while (memstoreRead < e.getWriteNumber()) {//等待readpoint大于当前的writepoint try { //(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint) readWaiters.wait(0); } catch (InterruptedException ie) { // We were interrupted... finish the loop -- i.e. cleanup --and then // on our way out, reset the interrupt flag. interrupted = true; } } } if (interrupted) Thread.currentThread().interrupt(); }
12.MultiRowMutationProcessor执行postProcess
执行协同类
结束
相关推荐
总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...
源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...
源码分析可以从以下几个方面入手: 1. 源码结构:了解项目目录结构,如src/main/java下的org.apache.hadoop.hbase目录,包含了所有主要模块的源代码。 2. 主要组件:深入研究RegionServer、MasterServer、Client等...
源码分析: 1. **目录结构**: 解压后的`hbase-book-master`包含项目的基本目录结构,如`src/main/java`用于存放Java源代码,`src/main/resources`存储资源配置文件,`pom.xml`是Maven项目对象模型,定义了项目的...
HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。
通过分析"HBaseTest"源码,我们可以更直观地理解HBase的工作原理和使用方式,为实际项目中的数据存储和处理提供有力的支持。在实际应用中,还需要根据具体需求进行性能调优,以充分利用HBase的优势,解决大数据场景...
通过分析源码,开发者可以深入理解分布式数据库的设计思想,而jar包则使开发者能够快速构建基于HBase的应用。不过需要注意的是,0.94.13已经是较旧的版本,最新的稳定版本可能会包含更多的功能和改进,因此在生产...
HBase是Apache软件基金会开发的一个开源、分布式、版本化、基于列族的NoSQL数据库,设计用于处理海量数据。这个“hbase-0.98.6.1-...总之,HBase 0.98.6.1的源码分析是理解大数据处理和NoSQL数据库设计的宝贵学习材料。
在Hadoop生态系统中,HBase提供了实时读写、强一致性的能力,是大数据分析的重要组件。本文将围绕"Hbase-1.2.6-bin+src.tar.rar"这一资源,深入探讨HBase的核心概念、架构、功能以及源码解析。 一、HBase概述 1.1 ...
源码分析是理解HbaseClient工作原理的关键。通过阅读源码,我们可以发现HbaseClient在执行操作时,会先将请求序列化成protobuf消息,然后通过HBase的RPC协议发送到RegionServer。RegionServer接收到请求后,解析并...
在本文中,我们将深入探讨HBase 1.2.6版本的特性、架构、核心概念以及源码分析,旨在帮助读者全面了解这一强大的大数据存储系统。 一、HBase概述 HBase是为非结构化数据设计的列式存储数据库,适合处理大规模数据。...
6. **src目录**:源代码文件,如果你需要查看或修改HBase的源码,这个目录是起点。 7. **LICENSE**和**NOTICE**文件:分别包含了HBase的许可证信息和版权声明。 在部署HBase之前,我们需要先确保已经安装了Java...
通过阅读源码,我们可以学习到如何设计高效的数据模型,如何实现数据的分布式存储和检索,以及如何优化HBase的性能。例如,`hbase-book.zip`可能包含了书中提到的各种示例程序,这些程序展示了如何使用HBase的Java ...
而HBase是一款基于Hadoop的分布式、高性能、列式存储的NoSQL数据库,适用于实时查询和分析大规模数据。 1. **简介** 在部署Hadoop和HBase时,我们需要构建一个可靠的分布式环境,确保数据的高可用性和容错性。...
2. 编译式查询:Phoenix将SQL查询编译成HBase的Get和Put操作,减少了中间解析步骤,提高了性能。 3. 分区和索引:Phoenix支持基于列的分区,以及创建二级索引,优化查询性能。 4. 动态Schema:Phoenix允许在运行时...
《HBase实战》 在大数据领域,HBase作为一款分布式、高性能的NoSQL数据库,扮演着至关重要的角色。...通过实战应用和源码分析,我们可以更好地优化HBase在特定场景下的性能,解决大数据时代的挑战。
同时,通过阅读和分析HBase源码,可以更深入地理解其工作原理,从而更好地优化应用性能。 总之,HBase工具包是连接HBase数据库和应用程序的重要桥梁,它通过简化常见的数据操作,让开发者能够专注于业务逻辑,而...
RegionObserver可以被看作是HBase中的一个插件,它可以在数据存储、读取和更新的过程中注入自定义逻辑,而无需修改HBase的核心源码。这个特性使得HBase非常灵活,能够适应各种复杂的应用场景。 1. **RegionObserver...