再次吐槽公司的sb环境,不让上网不能插优盘,今天有事回家写一下笔记HBase region split
在管理集群时,最容易导致hbase节点发生故障的恐怕就是hbase region split和compact的了,日志有split时间太长;文件找不到;split的时候response too slow等等,所以先看看hbase region split源码,希望对以后能有帮助
HBase region split源码分析
一、流程概述
1.HBaseAdmin 发起 hbase split
2.HRegionServer 确定分割点 region split point
3.CompactSplitThread和SplitRequest 进行region分割
3.1SplitTransaction st.prepare()初始化两个子region
3.2splitTransaction execute执行分割
3.2.1两个子region DaughterOpener线程 start
3.2.2若region 需要compact,进行compact路程
3.2.3HRegionServer添加子region到meta表,加入到RegionServer里
3.3修改zk节点状态,等待split结束
二 、hbase region split UML图

三、详细分析
1.HBaseAdmin 发起 hbase split
public void split(final byte [] tableNameOrRegionName,
final byte [] splitPoint) throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker();
try {
Pair<HRegionInfo, ServerName> regionServerPair
= getRegion(tableNameOrRegionName, ct);//获得HRI,若是但region
if (regionServerPair != null) {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
} else {
//split region 重点分析方法
split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
}
} else {
//table split流程
final String tableName = tableNameString(tableNameOrRegionName, ct);
List<Pair<HRegionInfo, ServerName>> pairs =
MetaReader.getTableRegionsAndLocations(ct,
tableName);
for (Pair<HRegionInfo, ServerName> pair: pairs) {
// May not be a server for a particular row
if (pair.getSecond() == null) continue;
HRegionInfo r = pair.getFirst();
// check for parents
if (r.isSplitParent()) continue;
// if a split point given, only split that particular region
if (splitPoint != null && !r.containsRow(splitPoint)) continue;
// call out to region server to do split now
split(pair.getSecond(), pair.getFirst(), splitPoint);
}
}
} finally {
cleanupCatalogTracker(ct);
}
}
2.HRegionServer 确定分割点 region split point
@Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException {
checkOpen();//检查server和hdfs是否可用
HRegion region = getRegion(regionInfo.getRegionName());//根据HRI获取region
region.flushcache();//flush cache 有几种情况不进行flush
//the cache is empte | the region is closed.| a flush is already in progress | writes are disabled
region.forceSplit(splitPoint);//设置split point
compactSplitThread.requestSplit(region, region.checkSplit());//获取split point,进行split
}
获得split point详细过程,获取最适合的store-hbase现在就是取最大的,获取store的midkey作为splitpoint
3.CompactSplitThread和SplitRequest 进行region分割
这里是split中较为复杂的过程
public void run() {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.debug("Skipping split because server is stopping=" +
this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
try {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
// 3.1SplitTransaction st.prepare()初始化两个子region
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);//3.2splitTransaction execute执行分割
this.server.getMetrics().incrementSplitSuccessCount();
} catch (Exception e) {
。。。。。。。。。。。。
3.2splitTransaction execute执行分割
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services)
throws IOException {
PairOfSameType<HRegion> regions = createDaughters(server, services);
//创建split临时目录,改变region zk状态,关闭region,停止所有store服务
//创建daughter目录,将region storefile放入目录中
//创建子region A、B,在zk上注册,并且设置原HRI下线
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
return regions;
}
加一个同样复杂的
3.2.0 createDaughters函数的操作
这里创建两个子Region,包括他们的regioninfo,并且将父region的hfile引用写入子Region中
生成两个子region的代码:.stepsBeforePONR
- public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
- final RegionServerServices services, boolean testing) throws IOException {
-
-
- if (server != null && server.getZooKeeper() != null) {
- try {
-
- createNodeSplitting(server.getZooKeeper(),
- parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
- } catch (KeeperException e) {
- throw new IOException("Failed creating PENDING_SPLIT znode on " +
- this.parent.getRegionNameAsString(), e);
- }
- }
- this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
- if (server != null && server.getZooKeeper() != null) {
-
-
-
-
- znodeVersion = getZKNode(server, services);
- }
-
-
- this.parent.getRegionFileSystem().createSplitsDir();
- this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
-
- Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
- Exception exceptionToThrow = null;
- try{
-
- hstoreFilesToSplit = this.parent.close(false);
- } catch (Exception e) {
- exceptionToThrow = e;
- }
- if (exceptionToThrow == null && hstoreFilesToSplit == null) {
-
-
-
-
-
- exceptionToThrow = closedByOtherException;
- }
- if (exceptionToThrow != closedByOtherException) {
- this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
- }
- if (exceptionToThrow != null) {
- if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
- throw new IOException(exceptionToThrow);
- }
- if (!testing) {
-
- services.removeFromOnlineRegions(this.parent, null);
- }
- this.journal.add(JournalEntry.OFFLINED_PARENT);
-
-
-
-
-
-
-
-
- splitStoreFiles(hstoreFilesToSplit);
-
-
-
-
-
-
- this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
-
-
- this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
- return new PairOfSameType<HRegion>(a, b);
- }
1.RegionSplitPolicy.getSplitPoint()获得region split的split point ,最大store的中间点midpoint最为split point
2.SplitRequest.run()
实例化SplitTransaction
st.prepare():split前准备:region是否关闭,所有hfile是否被引用
st.execute:执行split操作
1.createDaughters 创建两个region,获得parent region的写锁
1在zk上创建一个临时的node splitting point,
2等待master直到这个region转为splitting状态
3之后建立splitting的文件夹,
4等待region的flush和compact都完成后,关闭这个region
5从HRegionServer上移除,加入到下线region中
6进行regionsplit操作,创建线程池,用StoreFileSplitter类将region下的所有Hfile(StoreFile)进行split,
(split row在hfile中的不管,其他的都进行引用,把引用文件分别写到region下边)
7.生成左右两个子region,删除meta上parent,根据引用文件生成子region的regioninfo,写到hdfs上
2.stepsAfterPONR 调用DaughterOpener类run打开两个子region,调用initilize
a)向hdfs上写入.regionInfo文件以便meta挂掉以便恢复
b)初始化其下的HStore,主要是LoadStoreFiles函数:
对于该store函数会构造storefile对象,从hdfs上获取路径和文件,每个文件一个
storefile对象,对每个storefile对象会读取文件上的内容创建一个
HalfStoreFileReader读对象来操作该region的父region上的相应的文件,及该
region上目前存储的是引用文件,其指向的是其父region上的相应的文件,对该
region的所有读或写都将关联到父region上
将子Region添加到rs的online region列表上,并添加到meta表上
(0.98版本,包含以下3.2.1~3)
(0.94版本,两个方法之后给合在了stepsAfterPONR里边)
3.2.1两个子region DaughterOpener线程 start
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
boolean stopped = server != null && server.isStopped();
boolean stopping = services != null && services.isStopping();
// TODO: Is this check needed here?
if (stopped || stopping) {
LOG.info("Not opening daughters " +
b.getRegionInfo().getRegionNameAsString() +
" and " +
a.getRegionInfo().getRegionNameAsString() +
" because stopping=" + stopping + ", stopped=" + stopped);
} else {
// Open daughters in parallel.创建两个字region打开操作类
DaughterOpener aOpener = new DaughterOpener(server, a);
DaughterOpener bOpener = new DaughterOpener(server, b);
aOpener.start();
bOpener.start();
try {
aOpener.join();
bOpener.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted " + e.getMessage());
}
if (aOpener.getException() != null) {
throw new IOException("Failed " +
aOpener.getName(), aOpener.getException());
}
if (bOpener.getException() != null) {
throw new IOException("Failed " +
bOpener.getName(), bOpener.getException());
}
if (services != null) {
try {
// add 2nd daughter first (see HBASE-4335)
services.postOpenDeployTasks(b, server.getCatalogTracker(), true);
// Should add it to OnlineRegions
services.addToOnlineRegions(b);
services.postOpenDeployTasks(a, server.getCatalogTracker(), true);
services.addToOnlineRegions(a);
} catch (KeeperException ke) {
throw new IOException(ke);
}
}
}
}
调用HRegion 打开方法openHRegion
protected HRegion openHRegion(final CancelableProgressable reporter)
throws IOException {
checkCompressionCodecs();
long seqid = initialize(reporter);
//初始化region,
//1.checkRegionInfoOnFilesystem将HRegionInfo写入文件
//2.cleanupTempDir 清空老region临时目录
//3.初始化HRegion store,加载hfile
//4.获得recover.edit文件,找到对应的store,将读取的keyvalue输出到store,恢复hregion
if (this.log != null) {
this.log.setSequenceNumber(seqid);
}
return this;
}
3.2.2若region 需要compact,进行compact过程
compact过程有点复杂,过程如下:
1.将所有storefile放入compact候选者
2.交给coprocessor做处理,选择compact storefile
3.若coprocessor没有做处理,则采用系统算法选择
3.1必须要进行compact的文件,文件大小大于compact最大值并且没有其他被引用
3.2必须要进行compact文件小于compact文件最小数
3.3 isMajorCompaction判断是否需要major compact
3.3.1当ttl大于storefile中最大文件compact time,则不需要
3.3.2 以上反之,需要
3.3.3 最后一次major compaction时间大于majorCompactionTime,需要
3.4 当compact文件大于compact文件最大数,且需要major compaction活强制major compaction,则进行major compaction
3.5或则进行minor compact,他两个的区别在于一个compact文件数是所有并且删除就tts和version的数据,一个compact文件数不大于maxcompactfile配置
public CompactionRequest requestCompaction(int priority) throws IOException {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
CompactionRequest ret = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidates.subList(0, idx + 1).clear();
}
boolean override = false;
if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates);
}
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
} else {
filesToCompact = compactSelection(candidates, priority);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
}
// no files to compact
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
}
// basic sanity check: do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
}
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
}
} finally {
this.lock.readLock().unlock();
}
if (ret != null) {
CompactionRequest.preRequest(ret);
}
return ret;
}
在贴一段选compact文件的
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return new CompactionRequest(expiredSelection);
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
CompactionRequest result = new CompactionRequest(candidateSelection);
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
return result;
}
3.2.3HRegionServer添加子region到meta表,加入到RegionServer里
更新meta表
// If daughter of a split, update whole row, not just location.更新meta表 loaction和rowkey
MetaEditor.addDaughter(ct, r.getRegionInfo(),
this.serverNameFromMasterPOV);
加入regionserver
public void addToOnlineRegions(HRegion region) {
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
}
3.3修改zk节点状态,等待split结束
/* package */void transitionZKNode(final Server server,
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
// Tell master about split by updating zk. If we fail, abort.
if (server != null && server.getZooKeeper() != null) {
try {
this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
int spins = 0;
// Now wait for the master to process the split. We know it's done
// when the znode is deleted. The reason we keep tickling the znode is
// that it's possible for the master to miss an event.
do {
if (spins % 10 == 0) {
LOG.debug("Still waiting on the master to process the split for " +
this.parent.getRegionInfo().getEncodedName());
}
Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
spins++;
} while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping());
结束了,有时间再看看compact过程,其实在split中已经包含compact的过程,不知道是不是所有的compact流程都一样
相关推荐
5. 并发控制:学习RegionSplitPolicy、RegionSplitter等类,理解HBase如何处理并发请求和Region分裂。 6. 客户端API:研究HBase客户端如何通过Table、Get、Put、Scan等对象进行数据操作。 通过阅读源码,开发者可以...
源码中,`org.apache.hadoop.hbase.regionserver.SplitTransaction`类处理Region的分裂操作。另外,`org.apache.hadoop.hbase.regionserver.RegionMergeTransaction`类处理Region的合并。 9. **Compaction机制**:...
### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...
当一个Region的大小达到预设阈值时,`org.apache.hadoop.hbase.regionserver.HRegion`会触发分裂过程,这涉及到`org.apache.hadoop.hbase.regionserver.SplitTransaction`类的使用,确保分裂过程中数据的一致性。...
### HBase源码分析——关键知识点详解 #### 一、HBase性能测试总结与环境配置 **测试环境:** - **硬件配置:** - 1台客户端机器 - 5台RegionServer服务器 - 1台Master服务器 - 3台Zookeeper服务器 - **软件...
HBase的数据模型在源码中主要体现在`org.apache.hadoop.hbase.regionserver`包下的`Region`类,它是实际存储数据的单元,包含对行、列的管理。 3.2 操作API 客户端与HBase交互的API在`org.apache.hadoop.hbase....
例如,可以使用 HBase 的 Coprocessors 或者 Hadoop 的 Combiner 来优化数据处理,使用 Checkpoint 和 RegionSplit 来控制数据导入的粒度,确保数据的一致性。 标签 "源码 工具" 提示我们可以深入研究相关工具的源...
同时,Hive和HBase的配置也需要根据实际需求调整,如Hive的metastore设置、HBase的region split策略等。 总的来说,Hadoop、Hive和HBase构成了大数据处理的一个强大组合。通过深入学习和实践,你可以掌握这个领域的...