`

HBase-压缩和分割原理

 
阅读更多

 

HRegionServer调用合并请求

主要逻辑如下:

//遍历每个Store然后计算需要合并的文件,生成
//CompactionRequest对象并提交到线程池中执行
//根据throttleCompaction()函数规则来判断是提交到
//largeCompactions线程池还是smallCompactions线程池
CompactSplitThread#requestCompaction() {
	for (Store s : r.getStores().values()) {
		CompactionRequest cr = Store.requestCompaction(priority, request);
		ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
          ? largeCompactions : smallCompactions;
      	pool.execute(cr);		
		ret.add(cr);
	}	
}

//如果CompactionRequest的总大小 >
//minFilesToCompact * 2 * memstoreFlushSize
//则这次任务为major合并,否则在为minor合并
Store#throttleCompaction() {
    long throttlePoint = conf.getLong(
        "hbase.regionserver.thread.compaction.throttle",
        2 * this.minFilesToCompact * this.region.memstoreFlushSize);
    return compactionSize > throttlePoint;		
}


Store#compactSelection() {
	//选择出已经过期的StoreFile
	if(storefile.maxTimeStamp + store.ttl < now_timestamp) {
		//返回已经过期的store file文件集合	
	}
	
	//从0开始遍历到最后,如果发现有文件 > maxCompactSize则pos++
	//然后过滤掉这些大于maxCompactSize的文件
	while (pos < compactSelection.getFilesToCompact().size() &&
             compactSelection.getFilesToCompact().get(pos).getReader().length()
               > while (pos < compactSelection.getFilesToCompact().size() &&
             compactSelection.getFilesToCompact().get(pos).getReader().length()
               > maxCompactSize &&
             !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
      if (pos != 0) compactSelection.clearSubList(0, pos); &&
             !compactSelection.getFilesToCompact().get(pos).isReference()) {
		++pos;
	}
	if (pos != 0) {      	
		compactSelection.clearSubList(0, pos);
	}      
	if (compactSelection.getFilesToCompact().size() < minFilesToCompact) {
		return;	
	}
	
	//计算出sumSize数组,数组大小就是Store中的文件数量
	//sumSize数组中每个元素的大小是根据StroeFile的大小再加上 sumSize[i+1](或者0)
	//然后减去fileSizes[tooFar](或者0)
	//sumSize的内容跟元素的fileSizes数组应该差别不大
	int countOfFiles = compactSelection.getFilesToCompact().size();
	long [] fileSizes = new long[countOfFiles];
	long [] sumSize = new long[countOfFiles];
	for (int i = countOfFiles-1; i >= 0; --i) {
		StoreFile file = compactSelection.getFilesToCompact().get(i);
        fileSizes[i] = file.getReader().length();
        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
        int tooFar = i + this.maxFilesToCompact - 1;
        sumSize[i] = fileSizes[i] + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
			- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
	}
	
	//如果fileSize[start] > Math.max(minCompactSize,sumSize[start+1] * r)
	//则下标++,这里的操作是过滤掉过大的文件,以免影响合并时间
	while(countOfFiles - start >= this.minFilesToCompact && fileSizes[start] >
		Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
        ++start;
	}
	int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
	long totalSize = fileSizes[start] + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
		compactSelection = compactSelection.getSubList(start, end);
		
	//如果是major compact,并且需要执行的文件数量过多,则去掉一些	
	if(majorcompaction && compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
		int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
        compactSelection.getFilesToCompact().subList(0, pastMax).clear();		
	}		
}

 

 

 

 

 

CompactionRequest线程(用于执行major和minor合并)

压缩相关的类图如下:



major和minor合并的差别其实很小,如果最后待合并的总大小 > 2*minFilesToCompact*memstoreFlushSize

则认为这次是一个major合并,方到major线程池中执行,否则认为是一次minor合并

另外在创建StoreScanner构造函数时,会根据ScanType来判断是major还是minor合并,之后在

ScanQueryMathcer中根据ScanType的不同(有用户类型,minor和major三种类型)来决定返回的不同值的

主要逻辑如下:

//在单独的线程中执行合并
CompactionRequest#run() {
	boolean completed = HRegion.compact(this);
	if (completed) {
		if (s.getCompactPriority() <= 0) {
			server.getCompactSplitThread().requestCompaction(r, s, "Recursive enqueue", null);
		} else {
			// see if the compaction has caused us to exceed max region size
			server.getCompactSplitThread().requestSplit(r);
		}		
	}
}

//这里会调用Store,来执行compact
HRegion#compact() {
	Preconditions.checkArgument(cr.getHRegion().equals(this));
	lock.readLock().lock();
	CompactionRequest.getStore().compact(cr);
	lock.readLock().unlock();		
}

//完成合并,调用Compactor#compact()完成最核心的compact逻辑
//将合并后的文件移动到最终目录下并删除掉旧的文件
Store#compact() {
	List<StoreFile> filesToCompact = request.getFiles();
	StoreFile.Writer writer = this.compactor.compact(cr, maxId);
	if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
        sf = completeCompaction(filesToCompact, writer);	
	}else {
        // Create storefile around what we wrote with a reader on it.
		sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
          this.family.getBloomFilterType(), this.dataBlockEncoder);
        sf.createReader();
	}
}

//将 /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1
//移动到
// /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/value/9c8614a6bd0d4833b419a13abfde5ac1
//再对新的目标文件创建一个StroeFile对象包装
//将旧的文件(这些底层的HFile都已经合并成一个文件了)删除
//最后计算新的StoreFile文件大小等信息并返回
Store#completeCompaction() {
	Path origPath = compactedFile.getPath();
	Path destPath = new Path(homedir, origPath.getName());		
	HBaseFileSystem.renameDirForFileSystem(fs, origPath, destPath);
	StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
          this.family.getBloomFilterType(), this.dataBlockEncoder);
	passSchemaMetricsTo(result);
	result.createReader();	
}


//compact的最核心逻辑!!
//对多个StoreFile进行合并,这里使用到了StoreScanner
//迭代读取所有的StroeFile然后使用堆排序输出,并写入到
//StoreFile$Writer#append()中
Compactor#compact() {
    for (StoreFile file : filesToCompact) {
		StoreFile.Reader r = file.getReader();	
		long keyCount = (r.getBloomFilterType() == store.getFamily()
          .getBloomFilterType()) ?
          r.getFilterEntries() : r.getEntries();	
		maxKeyCount += keyCount;          	
	}
	
	int compactionKVMax = getConf().getInt("hbase.hstore.compaction.kv.max", 10);
	Compression.Algorithm compression = store.getFamily().getCompression();
	List<StoreFileScanner> scanners = StoreFileScanner
      .getScannersForStoreFiles(filesToCompact, false, false, true);
	Scan scan = new Scan();
	scan.setMaxVersions(store.getFamily().getMaxVersions());     
	
        //这里会根据当前合并的类型选择ScanType的类型,之后ScanQueryMatcher根据ScanType的
        //的类型返回不同的值
        InternalScanner scanner = new StoreScanner(store, store.getScanInfo(), scan, scanne        rs,majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
		smallestReadPoint, earliestPutTs);      
		
	do {
		hasMore = scanner.next(kvs, compactionKVMax);
		if (writer == null && !kvs.isEmpty()) {
			//在tmp目录下创建一个临时文件,路径类似
			// /hbase/mytable/963cf86f3fd07c3d3161c1f4f15bef5a/.tmp/9c8614a6bd0d4833b419a13abfde5ac1
			writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true,
			  maxMVCCReadpoint >= smallestReadPoint);
		}
		for (KeyValue kv : kvs) {
			writer.append(kv);	
		}		
	}while(hasMore);
	
	scanner.close();
	StoreFile$Writer.appendMetadata(maxId, majorCompaction);
	StoreFile$Writer.close();	
}

 

压缩算法和的核心逻辑演示类图

根据由新到老排序文件,选择出合适的文件

这里的滑动窗口是从0下标开始过滤掉size过大的文件,这样可以提高合并效率


 

 

 

 

 

使用到的一些重要类

其中内部scan的时候使用到的相关类图如下


相关重要的类:

Hbase在实现该算法的过程中重要的是下面这五个类。 
1.org.apache.hadoop.hbase.regionserver.Store 
2.org.apache.hadoop.hbase.regionserver.StoreScanner 
3.org.apache.hadoop.hbase.regionserver.StoreFileScanner 
4.org.apache.hadoop.hbase.regionserver.KeyValueHeap 
5.org.apache.hadoop.hbase.regionserver.ScanQueryMatcher 

这五个类的关系是 
1.Store类调用StoreScanner的next方法,并循环输出kv到合并文件; 
2.StoreScanner的作用是负责创建并持有多个输入文件的StoreFileScanner,
	内部遍历这些StoreFileScanner并通过KeyValueHeap来排序这些输入文件的首条记录; 
3.StoreFileScanner的作用是遍历单个输入文件,管理并提供单个输入文件的首条记录; 
4.KeyValueHeap的作用就是通过堆来排序每个输入文件的首条记录。 
5.ScanQueryMatcher的作用是当输入文件的首条记录来的时候,根据一定的策略判断这条记录到底是该输出还是该跳过。 

 

StoreScanner及相关类的主要逻辑如下:

//内部应用StoreFileScanner列表,创建ScanQueryMatcher用来判断是过滤还是输出
//创建KeyValueHeap用于堆排序,根据堆的结构每次从堆顶拿出一个
//注意这个构造函数中有一个参数ScanType,是扫描的类型,包括MAJOR_COMPACT,MINOR_COMPACT,
//USER_COMPACT来返回不同的值,以达到major或minor的效果
StoreScanner#构造函数() {
	ScanQueryMatcher matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
        smallestReadPoint, earliestPutTs, oldestUnexpiredTS);	
	List<? extends KeyValueScanner> scanners = selectScannersFrom(scanners);
	for(KeyValueScanner scanner : scanners) {
		scanner.seek(matcher.getStartKey());
    }	
    KeyValueHeap heap = new KeyValueHeap(scanners, store.comparator);     
}

//选择性的创建布隆过滤器,调用HFileWriterv2的append()
//写入KeyValue信息
StoreFile$Writer#append() {
      appendGeneralBloomfilter(kv);
      appendDeleteFamilyBloomFilter(kv);
      HFileWriterV2.append(kv);
      trackTimestamps(kv);	
}

//这个方法封装了处理heap取出的记录值的逻辑,
//根据matcher对该值的判断来决定这个值是输出还是跳过
StoreSanner#next() {
 KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }	
	LOOP: 
	while((kv = this.heap.peek()) != null) {    
		ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        	switch(qcode) {
          		case INCLUDE:
          		case INCLUDE_AND_SEEK_NEXT_ROW:
          		case INCLUDE_AND_SEEK_NEXT_COL:
            		Filter f = matcher.getFilter();
            		outResult.add(f == null ? kv : f.transform(kv));
            		count++;
            		if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
              			if (!matcher.moreRowsMayExistAfter(kv)) {
                			return false;
              			}
              			reseek(matcher.getKeyForNextRow(kv));
            		} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
              			reseek(matcher.getKeyForNextColumn(kv));
            		} else {
              			this.heap.next();
            		}          		        				
            		cumulativeMetric += kv.getLength();
            		if (limit > 0 && (count == limit)) {
              			break LOOP;
            		}
            		continue;
          		case DONE:
            		return true;
          		case DONE_SCAN:
            		close();
            		return false;		
          		case SEEK_NEXT_ROW:  
					if (!matcher.moreRowsMayExistAfter(kv)) {
              			return false;
            		}
            		reseek(matcher.getKeyForNextRow(kv));
            		break;  
          		case SEEK_NEXT_COL:
            		reseek(matcher.getKeyForNextColumn(kv));
            		break;
          		case SKIP:
            		this.heap.next();
            		break;
          		case SEEK_NEXT_USING_HINT:
            		KeyValue nextKV = matcher.getNextKeyHint(kv);
            		if (nextKV != null) {
              			reseek(nextKV);
            		} else {
              			heap.next();
            		}
            		break;
          		default:
            		throw new RuntimeException("UNEXPECTED");              	
    }//end while
}

//KeyValueHeap使用堆排序输出结果
//内部使用了优先队列,再用KVScannerComparator
//作为比较工具
KeyValueHeap#构造函数() {
	this.comparator = new KVScannerComparator(comparator);
	heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
          this.comparator);
	for (KeyValueScanner scanner : scanners) {
		if (scanner.peek() != null) {
			this.heap.add(scanner);
        } else {
			scanner.close();
        }
	}
	this.current = pollRealKV();          
}

//堆里面最重要的方法其实就是next,不过看这个方法的主要功能不是
//为了算出nextKeyValue,而主要是为了算出nextScanner,然后需在外部
//再次调用peek方法来取得nextKeyValue,不是很简练。
KeyValueHeap#next() {
	InternalScanner currentAsInternal = (InternalScanner)this.current;
    boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
    KeyValue pee = this.current.peek();
	if (pee == null || !mayContainMoreRows) {
		this.current.close();
    } else {
		this.heap.add(this.current);
    }
    this.current = pollRealKV();
    return (this.current != null);    
}

//这里省略了其他部分,注意这里有两个赋值
//对于compact来说如果是minor类型的则不会删除掉DELETE类型的KeyValue
//而major类型在最终输出的时候会删除掉DELETE类型的KeyValue标记
ScanQueryMatcher#构造函数() {
	//.....
    /* how to deal with deletes */
    this.isUserScan = scanType == ScanType.USER_SCAN;
    this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();	
	//..
}

 

 

 

 

 

HRegionServer调用split请求


执行逻辑如下:

//切分region
HRegionServer#splitRegion() {
	HRegion region = getRegion(regionInfo.getRegionName());
    region.flushcache();
    region.forceSplit(splitPoint);
    compactSplitThread.requestSplit(region, region.checkSplit());		
}

//创建SplitRequest对象,放到线程池中执行
CompactSplitThread#requestSplit() {
	ThreadPoolExecutor#execute(new SplitRequest(r, midKey, HRegionServer.this));	
}

  

 

 

 

 

split线程执行过程


 

META表更新的瞬间

主要逻辑如下:

//在单线中执行
SplitRequest#run() {
	SplitTransaction st = new SplitTransaction(parent, midKey);
	if (!st.prepare()) {
		return;	
	}
	st.execute(this.server, this.server);
}

//核心逻辑,先创建两个子region,再创建临时的ZK节点
//将父region切分,创建临时目录,将region关闭
//开始切分,将storefile放到目录中
//创建子regionA和B,同时open这两个region,更新META信息
//更新ZK信息,将原region下线
SplitTransaction#execute() {
    PairOfSameType<HRegion> regions = createDaughters(server, services);
    openDaughters(server, services, regions.getFirst(), regions.getSecond());
    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());		
}


//预先创建两个子region
SplitTransaction#prepare() {
	HRegionInfo hri = parent.getRegionInfo();
	hri_a = new HRegionInfo(hri.getTableName(), startKey, splitrow, false, rid);
	hri_b = new HRegionInfo(hri.getTableName(), splitrow, endKey, false, rid);
}

SplitTransaction#createDaughters() {
	//创建类似 /hbase/unassigned/fad11edf1e6e0a842b7fd3ad87f25053
	//这样的节点,其中的编码后的region就是待split的region
	createNodeSplitting();
	//用于记录事务的处理进展
	this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
	
	//将这个节点作为事务节点,待任务处理完后会删除这个节点
	transitionNodeSplitting();
	
	//创建类似 /hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits
	//的HDFS节点,用于临时处理split文件
	createSplitDir();	
	
	//关闭待处理的region
	List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
	HRegionServer.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
	splitStoreFiles(this.splitdir, hstoreFilesToSplit);
	this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
    HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
    HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
    
    //更新META表信息
	MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
        this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
            
    //返回两个子region A和B
    return new PairOfSameType<HRegion>(a, b);
}

SplitTransaction#splitStoreFiles() {
	for (StoreFile sf: hstoreFilesToSplit) {
		//splitStoreFile(sf, splitdir);
      	StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
      	futures.add(threadPool.submit(sfs));
    }	
    //等待线程池中的任务执行完后返回
}

//开始分割文件
SplitTransaction$StoreFileSplitter#call() {
	splitStoreFile(sf, splitdir);
}

SplitTransaction#splitStoreFile() {
    FileSystem fs = this.parent.getFilesystem();
    byte [] family = sf.getFamily();
    String encoded = this.hri_a.getEncodedName();    
    //地址类似
    // /hbase/kvdb/fad11edf1e6e0a842b7fd3ad87f25053/.splits/1977310abc183fac9aba3dc626b01a2d
    //    /value/92e897822d804d3bb4805548e9a80bd2.fad11edf1e6e0a842b7fd3ad87f25053
    Path storedir = Store.getStoreHomedir(splitdir, encoded, family);    
    //这里会根据splitRow分别创建两个文件,一个是从最开始到splitRow
    //还有一个是从splitRow到文件最后
    //这里是直接调用HDFS的API写入到底层文件系统中的
    StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
    encoded = this.hri_b.getEncodedName();
    storedir = Store.getStoreHomedir(splitdir, encoded, family);
    StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);		
}

//这里会根据传入的参数,是从开始到splitRow
//还是从splitRow到文件结束
//如果是从开始到splitRow,那么判断第一个key如果splitRow大则这个
//文件就不需要分割了,直接返回即可
StoreFile#split() {
	if (range == Reference.Range.bottom) {
		KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
      	byte[] firstKey = f.createReader().getFirstKey();
      	if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
          	splitKey.getKeyOffset(), splitKey.getKeyLength(), 
          	firstKey, 0, firstKey.length) < 0) {
        	return null;
      	}   		
	} else {
		KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
      	byte[] lastKey = f.createReader().getLastKey();      
      	if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
          	splitKey.getKeyOffset(), splitKey.getKeyLength(), 
          	lastKey, 0, lastKey.length) > 0) {
        	return null;
      	}		
	}	
	Reference r = new Reference(splitRow, range);
	String parentRegionName = f.getPath().getParent().getParent().getName();
	Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
    return r.write(fs, p);
}

//创建一个HRegion
SplitTransaction#createDaughterRegion() {
    FileSystem fs = this.parent.getFilesystem();
    Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
      this.splitdir, hri);
    HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
      this.parent.getLog(), fs, this.parent.getBaseConf(),
      hri, this.parent.getTableDesc(), rsServices);
    long halfParentReadRequestCount = this.parent.getReadRequestsCount() / 2;
    r.readRequestsCount.set(halfParentReadRequestCount);
    r.setOpMetricsReadRequestCount(halfParentReadRequestCount);
    long halfParentWriteRequest = this.parent.getWriteRequestsCount() / 2;
    r.writeRequestsCount.set(halfParentWriteRequest);
    r.setOpMetricsWriteRequestCount(halfParentWriteRequest);    
    HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
    return r;	
}

//设置region的info:regioninfo列为下线状态
//再增加两个列info:splitA和info:splitB
MetaEditor#offlineParentInMeta() {
    HRegionInfo copyOfParent = new HRegionInfo(parent);
    copyOfParent.setOffline(true);
    copyOfParent.setSplit(true);
    Put put = new Put(copyOfParent.getRegionName());
    addRegionInfo(put, copyOfParent);
    put.add("info", "splitA",Writables.getBytes(a));
    put.add("info", "splitB",Writables.getBytes(b));
    putToMetaTable(catalogTracker, put);	
}



//这里的DaughterOpener是对HRegion的封装
//会在新线程中启动HRegion#open()
//之后会更新META表信息,之后META表在很短的时间内
//会同时存在父region信息(已下线)和两个子region信息
SplitTransaction#openDaughters() {
	DaughterOpener aOpener = new DaughterOpener(server, a);
    DaughterOpener bOpener = new DaughterOpener(server, b);
    aOpener.start();
    bOpener.start();
	aOpener.join();
	bOpener.join();    
	
	HRegionServer.postOpenDeployTasks(b, server.getCatalogTracker(), true);
	// Should add it to OnlineRegions
	HRegionServer.addToOnlineRegions(b);
	HRegionServer.postOpenDeployTasks(a, server.getCatalogTracker(), true);
	HRegionServer.addToOnlineRegions(a);	  	
}

//如果StoreFile超过一定数量了会执行compact
//然后更新ZK或者ROOT和META表
HRegionServer#postOpenDeployTasks() {
	for (Store s : r.getStores().values()) {
		if (s.hasReferences() || s.needsCompaction()) {
        	getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
      	}
    }
    //更新ZK或者ROOT和META表
	if (r.getRegionInfo().isRootRegion()) {
      	RootLocationEditor.setRootLocation(getZooKeeper(),
       	this.serverNameFromMasterPOV);
    } else if (r.getRegionInfo().isMetaRegion()) {
      	MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),
        this.serverNameFromMasterPOV);
    } else {
      	if (daughter) {
        	// If daughter of a split, update whole row, not just location.
        	MetaEditor.addDaughter(ct, r.getRegionInfo(),
          	this.serverNameFromMasterPOV);
      	} else {
        	MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
          	this.serverNameFromMasterPOV);
      	}
    }    	
}

//将ZK中 /hbase/unassigned 节点下的
//fad11edf1e6e0a842b7fd3ad87f25053(待处理的region)
//删除
SplitTransaction#transitionZKNode() {
	transitionNodeSplit();
	tickleNodeSplit();	
}

 

 

 

 

 

一些辅助逻辑:

//等待压缩完成,然后刷新数据
//最后再线程池中关闭所有的Store
HRegion#close() {
	waitForFlushesAndCompactions();
	internalFlushcache();
	ThreadPoolExecutor storeCloserThreadPool =
          getStoreOpenAndCloseThreadPool("StoreCloserThread-"
            + this.regionInfo.getRegionNameAsString());
	CompletionService<ImmutableList<StoreFile>> completionService =
          new ExecutorCompletionService<ImmutableList<StoreFile>>(
            storeCloserThreadPool);	
            
	for (final Store store : stores.values()) {
		completionService.submit(new Callable<ImmutableList<StoreFile>>() {
			public ImmutableList<StoreFile> call() throws IOException {
				return store.close();
			}
		});
	}            
}

//提交到线程池中关闭所有打开的StoreFile
Store#close() {
	for (final StoreFile f : result) {
		completionService.submit(new Callable<Void>() {
            public Void call() throws IOException {
              f.closeReader(true);
              return null;
		}
	}
}

 

 

 

 

 

compactionChecker线程

这个类是用于定期检查region server下的region是否需要做compact

主要逻辑如下:

//不停的遍历当前RegionServer下的所有Region
//然后检查是否需要做compact
CompactionChecker#chore() {
	for (HRegion r : this.instance.onlineRegions.values()) {
		for (Store s : r.getStores().values()) {
			if (s.needsCompaction()) {
				// Queue a compaction. Will recognize if major is needed.
              	this.instance.compactSplitThread.requestCompaction(r, s, getName());
            } else if (s.isMajorCompaction()) {
				if (majorCompactPriority == DEFAULT_PRIORITY
                || majorCompactPriority > r.getCompactPriority()) {
                	this.instance.compactSplitThread.requestCompaction(r, s, getName());
                } else {
                	this.instance.compactSplitThread.requestCompaction(r, s, getName());	
                }
            }
		}	
	}	
}

 

 

 

 

 

参考

深入分析HBase Compaction机制

Hbase的Region Compact算法实现分析

深入分析HBase RPC(Protobuf)实现机制

HBase region split源码分析

 

 

  • 大小: 37.4 KB
  • 大小: 43.4 KB
  • 大小: 84.3 KB
  • 大小: 33.2 KB
  • 大小: 101.9 KB
  • 大小: 41.7 KB
  • 大小: 46.1 KB
  • 大小: 82.7 KB
分享到:
评论

相关推荐

    hbase-0.94.1.tar.gz

    《深入理解HBase:以hbase-0.94.1.tar.gz为例》 HBase,全称为Hadoop Database,是Google ...通过理解并实践这个版本,开发者可以更好地理解和掌握HBase的基本原理和应用技巧,为后续的HBase项目开发打下坚实基础。

    Hbase设置Snappy压缩测试

    HBase的源码分析可以帮助我们深入理解其内部工作原理,而使用HBase相关的工具(如HBase Shell、HBaseAdmin等)则可以方便地进行集群管理和操作。 例如,在`HbaseCommons.java`这个文件中,可能包含了HBase操作的...

    hbase原理和设计

    ### HBase原理与设计 #### 一、HBase概述 HBase是一个开源的、高性能的分布式存储系统,基于Hadoop之上构建。它提供了一个高度可靠、面向列的存储方案,适用于处理大规模的数据集。HBase的设计特点包括: 1. **高...

    HBase Introduction

    除了上述内容外,HBase还涉及许多其他高级主题,如压缩算法的选择、缓存策略、故障恢复机制等,这些都需要开发者根据具体的业务需求进行细致的考虑。 ### 结论 通过深入理解HBase的逻辑视图和物理视图,以及掌握...

    HBase学习利器:HBase实战

    《HBase in Action》是一本由Nick Dimiduk和Amandeep Khurana撰写的关于HBase实践的书籍,该书旨在帮助读者深入理解HBase的工作原理及其实战应用。 **章节概述**: - **第一部分:HBase基础** - **第1章:介绍...

    hbase 表设计

    HBase架构与传统的关系型数据库(如MySQL、PostgreSQL、Oracle等)有着显著的区别,其设计在某些方面为了扩展性和灵活的模式(Schema)而牺牲了一些传统特性。HBase的数据模型是稀疏的、分布式的、持久化的多维排序...

    7-分布式数据库HBase.ppt

    - **列式存储**:HBase以列族(Column Family)组织数据,每个列族包含多个列,这种设计有利于数据压缩和按需读取,减少了I/O开销。 - **分布式架构**:HBase通过Region Server将数据分布在多台服务器上,实现水平...

    大数据云计算技术系列 Hadoop之Hbase从入门到精通(共243页).pdf

    《大数据云计算技术系列:Hadoop之Hbase从入门到精通》 HBase,全称Hadoop Database...通过深入理解其技术原理和使用方法,开发者和数据工程师能够构建出高效、可扩展的数据处理系统,满足现代数据密集型应用的需求。

    hbase 教程 简单易懂 初学者必备

    通过"hbasedemo"这个示例项目,初学者可以动手实践,从创建表到数据操作,加深理解HBase的工作原理和实际应用。 总的来说,学习HBase不仅要掌握理论知识,更需要通过实践来巩固和提升。"hbase教程"旨在帮助初学者...

    hbase权威指南源代码

    源代码是书中理论知识的实践体现,对于理解HBase的工作原理、实现机制以及如何使用HBase进行数据存储和处理非常有帮助。以下是根据标题和描述提取的相关知识点: 1. **HBase概述**:HBase是构建在Hadoop文件系统...

    HBase schema 设计(英文)

    总之,HBase的Schema设计与关系型数据库有较大差异,需要从其独特的数据模型和存储原理出发,通过合理规划行键、列族、压缩策略和版本控制等,来设计出能够高效支持大规模数据随机访问需求的表结构。在实践过程中,...

    Hbase权威指南(中文版)快捷PDF版

    通过阅读《HBase权威指南》中文版,无论是初学者还是有经验的开发者,都能深入理解HBase的工作原理,并掌握其配置、管理和调优技巧,从而在大数据领域游刃有余。这本书的价值在于其详尽的内容和实践导向,是学习和...

    hbase 权威指南

    《HBase 权威指南》是一本深入解析Apache ...通过阅读《HBase 权威指南》,读者不仅可以掌握HBase的基本概念和技术原理,还能了解到实际项目中如何设计和优化HBase系统,从而在大数据领域实现高效的数据存储和处理。

    hadoop相关技术原理

    - 高性能:由于其列式存储和数据压缩,Hbase 在读写速度上有显著优势。 - 动态列:列族可以动态添加,节省存储空间。 - 数据自动分割:数据分布在多个节点上,实现水平扩展。 - 并发支持:支持高并发读写操作。 ...

    大数据技术原理及应用.pdf

    Avro是一个用于数据序列化的系统,提供了丰富的数据结构类型、快速可压缩的二进制数据格式、存储持久性数据的文件集、远程调用RPC的功能和简单的动态语言集成功能。Avro系统依赖于模式(Schema),Avro数据的读和写...

    hadoop大数据技术原理与应用ppt

    【Hadoop大数据技术原理与应用】是现代大数据处理的核心框架之一,它由Apache软件基金会开发,主要用于处理和存储海量数据。Hadoop的出现解决了传统单机系统无法应对的大量非结构化和半结构化数据的问题,它以分布式...

    23-Sqoop数据导入导出1

    本篇将详细介绍Sqoop的导入和导出过程,以及在MySQL、HDFS、Hive和HBase之间的具体操作。 ### Sqoop导入原理 1. **JDBC检查**:在开始导入前,Sqoop通过JDBC连接数据库,获取表结构和列信息,如数据类型,将其映射...

    Hadoop实战中文版

    - 面向列族:数据按列族存储,便于压缩和索引。 - 高性能:支持海量数据的快速读写操作。 - 可扩展性:支持水平扩展,随着数据量增加而扩展硬件资源。 #### 六、Mahout - **概念介绍**:Mahout是一个可伸缩的...

Global site tag (gtag.js) - Google Analytics