再次吐槽公司的sb环境,不让上网不能插优盘,今天有事回家写一下笔记HBase region split
在管理集群时,最容易导致hbase节点发生故障的恐怕就是hbase region split和compact的了,日志有split时间太长;文件找不到;split的时候response too slow等等,所以先看看hbase region split源码,希望对以后能有帮助
HBase region split源码分析
一、流程概述
1.HBaseAdmin 发起 hbase split
2.HRegionServer 确定分割点 region split point
3.CompactSplitThread和SplitRequest 进行region分割
3.1SplitTransaction st.prepare()初始化两个子region
3.2splitTransaction execute执行分割
3.2.1两个子region DaughterOpener线程 start
3.2.2若region 需要compact,进行compact路程
3.2.3HRegionServer添加子region到meta表,加入到RegionServer里
3.3修改zk节点状态,等待split结束
二 、hbase region split UML图
三、详细分析
1.HBaseAdmin 发起 hbase split
public void split(final byte [] tableNameOrRegionName,
final byte [] splitPoint) throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker();
try {
Pair<HRegionInfo, ServerName> regionServerPair
= getRegion(tableNameOrRegionName, ct);//获得HRI,若是但region
if (regionServerPair != null) {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
} else {
//split region 重点分析方法
split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
}
} else {
//table split流程
final String tableName = tableNameString(tableNameOrRegionName, ct);
List<Pair<HRegionInfo, ServerName>> pairs =
MetaReader.getTableRegionsAndLocations(ct,
tableName);
for (Pair<HRegionInfo, ServerName> pair: pairs) {
// May not be a server for a particular row
if (pair.getSecond() == null) continue;
HRegionInfo r = pair.getFirst();
// check for parents
if (r.isSplitParent()) continue;
// if a split point given, only split that particular region
if (splitPoint != null && !r.containsRow(splitPoint)) continue;
// call out to region server to do split now
split(pair.getSecond(), pair.getFirst(), splitPoint);
}
}
} finally {
cleanupCatalogTracker(ct);
}
}
2.HRegionServer 确定分割点 region split point
@Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException {
checkOpen();//检查server和hdfs是否可用
HRegion region = getRegion(regionInfo.getRegionName());//根据HRI获取region
region.flushcache();//flush cache 有几种情况不进行flush
//the cache is empte | the region is closed.| a flush is already in progress | writes are disabled
region.forceSplit(splitPoint);//设置split point
compactSplitThread.requestSplit(region, region.checkSplit());//获取split point,进行split
}
获得split point详细过程,获取最适合的store-hbase现在就是取最大的,获取store的midkey作为splitpoint
3.CompactSplitThread和SplitRequest 进行region分割
这里是split中较为复杂的过程
public void run() {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.debug("Skipping split because server is stopping=" +
this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
try {
final long startTime = System.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
// 3.1SplitTransaction st.prepare()初始化两个子region
if (!st.prepare()) return;
try {
st.execute(this.server, this.server);//3.2splitTransaction execute执行分割
this.server.getMetrics().incrementSplitSuccessCount();
} catch (Exception e) {
。。。。。。。。。。。。
3.2splitTransaction execute执行分割
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services)
throws IOException {
PairOfSameType<HRegion> regions = createDaughters(server, services);
//创建split临时目录,改变region zk状态,关闭region,停止所有store服务
//创建daughter目录,将region storefile放入目录中
//创建子region A、B,在zk上注册,并且设置原HRI下线
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
return regions;
}
加一个同样复杂的
3.2.0 createDaughters函数的操作
这里创建两个子Region,包括他们的regioninfo,并且将父region的hfile引用写入子Region中
生成两个子region的代码:.stepsBeforePONR
- public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
- final RegionServerServices services, boolean testing) throws IOException {
-
-
- if (server != null && server.getZooKeeper() != null) {
- try {
-
- createNodeSplitting(server.getZooKeeper(),
- parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
- } catch (KeeperException e) {
- throw new IOException("Failed creating PENDING_SPLIT znode on " +
- this.parent.getRegionNameAsString(), e);
- }
- }
- this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
- if (server != null && server.getZooKeeper() != null) {
-
-
-
-
- znodeVersion = getZKNode(server, services);
- }
-
-
- this.parent.getRegionFileSystem().createSplitsDir();
- this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
-
- Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
- Exception exceptionToThrow = null;
- try{
-
- hstoreFilesToSplit = this.parent.close(false);
- } catch (Exception e) {
- exceptionToThrow = e;
- }
- if (exceptionToThrow == null && hstoreFilesToSplit == null) {
-
-
-
-
-
- exceptionToThrow = closedByOtherException;
- }
- if (exceptionToThrow != closedByOtherException) {
- this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
- }
- if (exceptionToThrow != null) {
- if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
- throw new IOException(exceptionToThrow);
- }
- if (!testing) {
-
- services.removeFromOnlineRegions(this.parent, null);
- }
- this.journal.add(JournalEntry.OFFLINED_PARENT);
-
-
-
-
-
-
-
-
- splitStoreFiles(hstoreFilesToSplit);
-
-
-
-
-
-
- this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
- HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
-
-
- this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
- HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
- return new PairOfSameType<HRegion>(a, b);
- }
1.RegionSplitPolicy.getSplitPoint()获得region split的split point ,最大store的中间点midpoint最为split point
2.SplitRequest.run()
实例化SplitTransaction
st.prepare():split前准备:region是否关闭,所有hfile是否被引用
st.execute:执行split操作
1.createDaughters 创建两个region,获得parent region的写锁
1在zk上创建一个临时的node splitting point,
2等待master直到这个region转为splitting状态
3之后建立splitting的文件夹,
4等待region的flush和compact都完成后,关闭这个region
5从HRegionServer上移除,加入到下线region中
6进行regionsplit操作,创建线程池,用StoreFileSplitter类将region下的所有Hfile(StoreFile)进行split,
(split row在hfile中的不管,其他的都进行引用,把引用文件分别写到region下边)
7.生成左右两个子region,删除meta上parent,根据引用文件生成子region的regioninfo,写到hdfs上
2.stepsAfterPONR 调用DaughterOpener类run打开两个子region,调用initilize
a)向hdfs上写入.regionInfo文件以便meta挂掉以便恢复
b)初始化其下的HStore,主要是LoadStoreFiles函数:
对于该store函数会构造storefile对象,从hdfs上获取路径和文件,每个文件一个
storefile对象,对每个storefile对象会读取文件上的内容创建一个
HalfStoreFileReader读对象来操作该region的父region上的相应的文件,及该
region上目前存储的是引用文件,其指向的是其父region上的相应的文件,对该
region的所有读或写都将关联到父region上
将子Region添加到rs的online region列表上,并添加到meta表上
(0.98版本,包含以下3.2.1~3)
(0.94版本,两个方法之后给合在了stepsAfterPONR里边)
3.2.1两个子region DaughterOpener线程 start
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
boolean stopped = server != null && server.isStopped();
boolean stopping = services != null && services.isStopping();
// TODO: Is this check needed here?
if (stopped || stopping) {
LOG.info("Not opening daughters " +
b.getRegionInfo().getRegionNameAsString() +
" and " +
a.getRegionInfo().getRegionNameAsString() +
" because stopping=" + stopping + ", stopped=" + stopped);
} else {
// Open daughters in parallel.创建两个字region打开操作类
DaughterOpener aOpener = new DaughterOpener(server, a);
DaughterOpener bOpener = new DaughterOpener(server, b);
aOpener.start();
bOpener.start();
try {
aOpener.join();
bOpener.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted " + e.getMessage());
}
if (aOpener.getException() != null) {
throw new IOException("Failed " +
aOpener.getName(), aOpener.getException());
}
if (bOpener.getException() != null) {
throw new IOException("Failed " +
bOpener.getName(), bOpener.getException());
}
if (services != null) {
try {
// add 2nd daughter first (see HBASE-4335)
services.postOpenDeployTasks(b, server.getCatalogTracker(), true);
// Should add it to OnlineRegions
services.addToOnlineRegions(b);
services.postOpenDeployTasks(a, server.getCatalogTracker(), true);
services.addToOnlineRegions(a);
} catch (KeeperException ke) {
throw new IOException(ke);
}
}
}
}
调用HRegion 打开方法openHRegion
protected HRegion openHRegion(final CancelableProgressable reporter)
throws IOException {
checkCompressionCodecs();
long seqid = initialize(reporter);
//初始化region,
//1.checkRegionInfoOnFilesystem将HRegionInfo写入文件
//2.cleanupTempDir 清空老region临时目录
//3.初始化HRegion store,加载hfile
//4.获得recover.edit文件,找到对应的store,将读取的keyvalue输出到store,恢复hregion
if (this.log != null) {
this.log.setSequenceNumber(seqid);
}
return this;
}
3.2.2若region 需要compact,进行compact过程
compact过程有点复杂,过程如下:
1.将所有storefile放入compact候选者
2.交给coprocessor做处理,选择compact storefile
3.若coprocessor没有做处理,则采用系统算法选择
3.1必须要进行compact的文件,文件大小大于compact最大值并且没有其他被引用
3.2必须要进行compact文件小于compact文件最小数
3.3 isMajorCompaction判断是否需要major compact
3.3.1当ttl大于storefile中最大文件compact time,则不需要
3.3.2 以上反之,需要
3.3.3 最后一次major compaction时间大于majorCompactionTime,需要
3.4 当compact文件大于compact文件最大数,且需要major compaction活强制major compaction,则进行major compaction
3.5或则进行minor compact,他两个的区别在于一个compact文件数是所有并且删除就tts和version的数据,一个compact文件数不大于maxcompactfile配置
public CompactionRequest requestCompaction(int priority) throws IOException {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
CompactionRequest ret = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidates.subList(0, idx + 1).clear();
}
boolean override = false;
if (region.getCoprocessorHost() != null) {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates);
}
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
} else {
filesToCompact = compactSelection(candidates, priority);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
}
// no files to compact
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
}
// basic sanity check: do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
}
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
}
} finally {
this.lock.readLock().unlock();
}
if (ret != null) {
CompactionRequest.preRequest(ret);
}
return ret;
}
在贴一段选compact文件的
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
return new CompactionRequest(expiredSelection);
}
}
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
CompactionRequest result = new CompactionRequest(candidateSelection);
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
return result;
}
3.2.3HRegionServer添加子region到meta表,加入到RegionServer里
更新meta表
// If daughter of a split, update whole row, not just location.更新meta表 loaction和rowkey
MetaEditor.addDaughter(ct, r.getRegionInfo(),
this.serverNameFromMasterPOV);
加入regionserver
public void addToOnlineRegions(HRegion region) {
this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
}
3.3修改zk节点状态,等待split结束
/* package */void transitionZKNode(final Server server,
final RegionServerServices services, HRegion a, HRegion b)
throws IOException {
// Tell master about split by updating zk. If we fail, abort.
if (server != null && server.getZooKeeper() != null) {
try {
this.znodeVersion = transitionNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
int spins = 0;
// Now wait for the master to process the split. We know it's done
// when the znode is deleted. The reason we keep tickling the znode is
// that it's possible for the master to miss an event.
do {
if (spins % 10 == 0) {
LOG.debug("Still waiting on the master to process the split for " +
this.parent.getRegionInfo().getEncodedName());
}
Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeSplit(server.getZooKeeper(),
parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
server.getServerName(), this.znodeVersion);
spins++;
} while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping());
结束了,有时间再看看compact过程,其实在split中已经包含compact的过程,不知道是不是所有的compact流程都一样
相关推荐
【HBASERegion数量增多问题描述及解决方案】 在HBase分布式数据库中,Region是表数据的基本存储单元,它将表的数据按照ROWKEY的范围进行分割。随着数据的增长,一个Region会分裂成两个,以此来确保数据的均衡分布。...
7. ** Region分裂**:源码会展示HBase如何自动进行Region分裂,以保持性能和负载均衡。 8. **MapReduce与HBase**:书中可能包括使用MapReduce与HBase进行批量数据处理的示例。 9. **HBase与Hadoop集成**:源码会...
该系统是一款基于Java语言的HBase数据库设计源码,共计26个文件,其中包含18个Java源文件、5个XML配置文件、2个Git忽略文件和1个属性文件。
5. 并发控制:学习RegionSplitPolicy、RegionSplitter等类,理解HBase如何处理并发请求和Region分裂。 6. 客户端API:研究HBase客户端如何通过Table、Get、Put、Scan等对象进行数据操作。 通过阅读源码,开发者可以...
该项目为Apache HBase数据库设计源码,采用Java语言编写,包含5428个文件,其中4589个为Java源文件,222个为Ruby脚本,118个为XML配置文件,其余文件包括Shell脚本、Python脚本、HTML、CSS、JavaScript、C++、PHP、C...
总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...
源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...
HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip
HBase 1.2.0是该数据库的一个稳定版本,包含了众多优化和改进,对于想要深入理解HBase工作原理或者进行大数据分析的学习者来说,研究其源码是非常有价值的。 一、HBase架构与核心概念 1. 表与Region:HBase中的...
HBase的region split策略一共有以下几种: 1、ConstantSizeRegionSplitPolicy 0.94版本前默认切分策略 当region大小大于某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。 ...
HBase 0.94.4的源码分析有助于我们深入了解其内部机制,从而更好地进行系统设计和优化。无论是对于开发者还是管理员,掌握HBase的核心原理都将极大地提升在大数据领域的实践能力。通过不断学习和实践,我们可以更好...
本项目为基于Java语言的HBase分布式数据库设计源码,包含5415个文件,涵盖了4575个Java源文件、223个Ruby脚本、118个XML...该源码是GitHub上HBase项目的核心代码,适用于需要深入理解和分析HBase架构与实现细节的场景。
HBase的源码分析有助于理解其内部工作原理。例如,`HRegionServer`是数据服务的主要组件,负责Region的管理和数据操作;`HMaster`负责Region的分配和负载均衡;`HStore`管理Column Family,包含一系列的`HStoreFile...
源码中,`org.apache.hadoop.hbase.regionserver.SplitTransaction`类处理Region的分裂操作。另外,`org.apache.hadoop.hbase.regionserver.RegionMergeTransaction`类处理Region的合并。 9. **Compaction机制**:...
《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其源码提供了对HBase工作原理和实现细节的深入理解。HBase,作为Apache Hadoop生态系统中的一个关键组件,是为大规模数据存储设计的高性能、...
当region文件大小达到由“hbase.hregion.max.filesize”参数决定的上限(默认256MB),触发region split操作,原region一分为二,以提高数据读写效率与分布均衡性。 在此过程中,“.splits”目录的引入,有效避免了...
基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的...