compact处理流程分析
compact的处理与split相同,由client端与flush时检查发起。
针对compact还有一个在rs生成时生成的CompactionChecker线程定期去检查是否需要做compact操作
线程执行的间隔时间通过hbase.server.thread.wakefrequency配置,默认为10*1000ms
CompactionChecker线程主要作用:
生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否需要compact的检查线程,
如果需要进行compact,会在此处通过compact的线程触发compcat的请求
此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置major compact的优先级,
如果major compact的优先级大过此值,把compact的优先级设置为此值.
Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间需要进行compact检查的间隔
默认为1000ms,
compactionChecker的检查周期为wakefrequency*multiplier ms,
也就是默认情况下线程调用1000次执行一次compact检查
a.compaction检查时发起compact的条件是
如果一个store中所有的file个数减去在做(或发起compact请求)的个数,大于或等于
hbase.hstore.compaction.min配置的值,
老版本使用hbase.hstore.compactionThreshold进行配置,默认值为3
b.major compact的条件检查
通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24
通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默认为0.2,
也就是major的时间上下浮动4.8小时
b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示需要major,
b2.1>store下是否只有一个文件,同时这个文件已经到了major的时间,
b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间需要major,否则不做
b2.2>文件个数大于1,到达major的时间,需要major
Client端发起compactRegion的request
Client通过HBaseAdmin.compact发起regionserver的rpc连接,调用regionserver.compactRegion
如果传入的是tablename而不是regionname,会迭代出此table的所有region调用HRegionServer.compactRegion
由client发起,调用HRegionServer.compactRegion
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
从onlineRegions中得到request的Hregion实例
HRegion region = getRegion(request.getRegion());
region.startRegionOperation(Operation.COMPACT_REGION);
LOG.info("Compacting " + region.getRegionNameAsString());
booleanmajor = false;
byte [] family = null;
Storestore = null;
如果client发起的request中传入有columnfamily的值,得到此cf的HStore
if (request.hasFamily()) {
family = request.getFamily().toByteArray();
store = region.getStore(family);
if (store == null) {
thrownewServiceException(newIOException("column family " + Bytes.toString(family) +
" does not exist in region " + region.getRegionNameAsString()));
}
}
检查是否是major的compact请求
if (request.hasMajor()) {
major = request.getMajor();
}
如果是发起majorcompaction的操作,
if (major) {
if (family != null) {
store.triggerMajorCompaction();
} else {
region.triggerMajorCompaction();
}
}
String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
LOG.trace("User-triggered compaction requested for region " +
region.getRegionNameAsString() + familyLogMsg);
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
否则是一般compation的请求,通过compactsplitThread.requestCompaction发起compact request
if(family != null) {
compactSplitThread.requestCompaction(region, store, log,
Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log,
Store.PRIORITY_USER, null);
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
thrownewServiceException(ie);
}
}
非major的compact处理流程
requestCompaction不管是直接传入sotre或者是region的传入,
如果传入的是region,那么会拿到region下的所有store,迭代调用每一个store的compaction request操作。
所有的非major compaction request最终会通过如下方法发起compaction request
private synchronized CompactionRequest requestCompactionInternal(final HRegion r,
final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
针对store的compaction request处理流程
如果要对一个HBASE的表禁用掉compaction操作,可以通过create table时配置COMPACTION_ENABLED属性
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
returnnull;
}
CompactionContextcompaction = null;
此时的调用selectNow为true,(如果是系统调用,此时的selectNow为false,)
也就是在发起request到CompactSplitThread.CompactionRunner线程执行时,
如果是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例
if (selectNow) {
通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile
并设置其request.priority为Store.PRIORITY_USER表示用户发起的request
如果是flush时发起的compact,
并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,
表示系统发起的request,
如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER
那么priority的值为PRIORITY_USER+1
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) returnnull; // message logged inside
}
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
longsize = selectNow ? compaction.getRequest().getSize() : 0;
此时好像一直就得不到largeCompactions的实例(在system时通过CompactionRunner线程检查),
因为selectNow==false时,size的大小为0
不可能大于hbase.regionserver.thread.compaction.throttle配置的值
此配置的默认值是hbase.hstore.compaction.max*2*memstoresize
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
? largeCompactions : smallCompactions;
通过smallCompactions的线程池生成CompactionRunner线程并执行,见执行Compaction的处理线程
pool.execute(newCompactionRunner(s, r, compaction, pool));
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
}
returnselectNow ? compaction.getRequest() : null;
}
生成CompactionRequest实例
Hstore.requestcompaction得到要进行compact的storefile,并生成一个CompactionContext
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException {
// don't even select for compaction if writes are disabled
if (!this.areWritesEnabled()) {
returnnull;
}
生成一个DefaultStoreEngine.DefaultCompactionContext实例(如果storeEngine是默认的配置)
CompactionContextcompaction = storeEngine.createCompaction();
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// First, see if coprocessor would want to override selection.
if (this.getCoprocessorHost() != null) {
List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
booleanoverride = this.getCoprocessorHost().preCompactSelection(
this, candidatesForCoproc, baseRequest);
if (override) {
// Coprocessor is overriding normal file selection.
compaction.forceSelect(newCompactionRequest(candidatesForCoproc));
}
}
// Normal case - coprocessor is not overriding file selection.
if (!compaction.hasSelection()) {
如果是client端发起的compact,此时的值为true,如果是flush时发起的compact,此时的值为false
booleanisUserCompaction = priority == Store.PRIORITY_USER;
offPeakHours的值说明:
1.通过hbase.offpeak.start.hour配置major的启动开始小时,如配置为1
2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2
如果启动时间是1与2配置的小时时间内,那么配置有这两个值后,
主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默认为10,
减去1个文件的总和的多少倍,
如:有10个待做compact的文件,第一个文件(i=0)的size是=i+max(10)-1=9,
以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,如果超过了倍数,不做compact
如果1与2配置为不等于-1,同时start小于end,当前做compact的时间刚好在此时间内,
多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默认为5.0f
否则通过hbase.hstore.compaction.ratio配置得到,默认为1.2f
booleanmayUseOffPeak = offPeakHours.isOffPeakHour() &&
offPeakCompactionTracker.compareAndSet(false, true);
try {
调用DefaultStoreEngine.DefaultCompactionContext实例的select方法,返回true/false,
对compaction.select的具体分析说明可参见major的compact处理流程
true表示有compactrequest,否则表示没有compactrequest
此方法最终调用RatioBasedCompactionPolicy.selectCompaction方法,
生成CompactRequest并放入到DefaultStoreEngine.DefaultCompactionContext的request属性中
得到要compact的storefile列表,放入到HStore.filesCompacting列表中
方法传入的forceMajor实例只有在发起major compact时同时fileCompacting列表中没有值时,此值为true,
其它情况值都为false.就是最后一个参数的值为false
a.在compaction.select方法中得到此store中所有的storefile列表,
传入到compactionPolicy.selectCompaction方法中。
RatioBasedCompactionPolicy.selectCompaction方法处理流程:
1.检查所有的storefile的个数减去正在做compact的storefile文件个数
是否大于hbase.hstore.blockingStoreFiles配置的值,默认为7,
比对方法:
a.如果filesCompacting(正在做compact的storefile列表)不为空
那么storefiles的个数减去正在做compact的storefile文件个数加1是否大于blockingStoreFiles配置的值
b.如果filesCompacting(正在做compact的storefile列表)为空
那么storefiles的个数减去正在做compact的storefile文件个数是否大于blockingStoreFiles配置的值
2.从所有的storefile列表中移出正在做compcat的storefile列表(fileCompacting列表中的数据)
得到还没做(可选的)compact的storefiles列表
3.如果columnfamily配置中的MIN_VERSIONS的值没有配置(=0),
得到TTL配置的值(不配置=Integer.MAX_VALUE=-1)配置的值为秒为单位,否则得到Long.MAX_VALUE
4.检查如果hbase.store.delete.expired.storefile配置的值为true(default=true),同时ttl非默认值
从2中得到的storefile列表中得到ttl超时的所有storefile列表。
4.1如果有ttl过期的storefile,生成这些storefile的CompactionRequest请求并返回
4.2如果没有ttl过期的storefile,(控制大文件先不做小的compact)
把storefile列表中size超过hbase.hstore.compaction.max.size配置的storefile移出,默认为Long.MAX_VALUE
5.检查storefile是否需要做major compact操作,
5.1得到通过hbase.hregion.majorcompaction配置的值默认为1000*60*60*24*7
5.2得到通过hbase.hregion.majorcompaction.jitter配置的值,默认为0.5f
5.3检查storefile中最先更新的storefile的更新时间是否在5.1与5.2配置的时间内(默认是3.5天到7天之间)
如果配置为24小时,那么执行时间的加减为4.8个小时
5.4如果还没有超过配置的时间,表示不需要发做major compact,
5.5如果在时间范围内或超过此配置的时间,表示需要做major compact,
a.同时如果只有一个storefile此storefile的最小更新时间已经超过了ttl的配置时间,需要做major compact
b.如果有多个storefile文件,表示需要做major compat.
6.检查是否需要做compact还有一个条件,在5成立的条件下,
如果当前要做compact的storefile的个数小于hbase.hstore.compaction.max配置的值,默认10,
-
5与6的检查条件都成立,或者此region (有个split操作,有References文件),,表示升级为major的compact
-
如果没有升级成major的compact,把storefile列表中的bluk load的file移出
-
计算出最大的几个storefile,也就是file size的值是后面几个文件的size的多少倍,
把超过倍数的storefile移出,不做compact
可以看上面对offPeakHours的值说明:
10. 如果现在还有需要做compcat的storefile列表,检查文件个数是否达到最小compact的配置的值,
通过hbase.hstore.compaction.min配置,默认为3,老版本通过hbase.hstore.compactionThreshold配置
如果没有达到最小的配置值,不做compact
11.如果没有升级到major,把超过hbase.hstore.compaction.max配置的storefile移出列表。默认配置为10
12.生成并返回一个CompactionRequest的实例。如果非major,同时在offPeakHours的值说明的时间内,
把CompactionRequest的isOffPeak设置为true,否则设置为false(major)
compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
} catch (IOException e) {
if (mayUseOffPeak) {
offPeakCompactionTracker.set(false);
}
throwe;
}
assertcompaction.hasSelection();
if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
// Compaction policy doesn't want to take advantage of off-peak.
offPeakCompactionTracker.set(false);
}
}
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
}
// Selected files; see if we have a compaction with some custom base request.
if (baseRequest != null) {
// Update the request with what the system thinks the request should be;
// its up to the request if it wants to listen.
compaction.forceSelect(
baseRequest.combineWith(compaction.getRequest()));
}
// Finally, we have the resulting files list. Check if we have any files at all.
finalCollection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
if (selectedFiles.isEmpty()) {
returnnull;
}
// Update filesCompacting (check that we do not try to compact the same StoreFile twice).
if (!Collections.disjoint(filesCompacting, selectedFiles)) {
Preconditions.checkArgument(false, "%s overlaps with %s",
selectedFiles, filesCompacting);
}
把当前要执行compact的storefile列表添加到HStore.filesCompacting中。
filesCompacting.addAll(selectedFiles);
通过storefile的seqid按从小到大排序
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// If we're enqueuing a major, clear the force flag.
如果当前要做compact的文件个数等待当前sotre中所有的storefile个数,把当前的compact提升为major
booleanisMajor = selectedFiles.size() == this.getStorefilesCount();
this.forceMajor = this.forceMajor && !isMajor;
// Set common request properties.
// Set priority, either override value supplied by caller or from store.
compaction.getRequest().setPriority(
(priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
compaction.getRequest().setIsMajor(isMajor);
compaction.getRequest().setDescription(
getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
}
} finally {
this.lock.readLock().unlock();
}
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
+ (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
returncompaction;
}
执行Compaction的处理流程
在compact执行时是通过指定的线程池生成并执行CompactSplitThread.CompactionRunner线程
以下是线程执行的具体说明:
public void run() {
Preconditions.checkNotNull(server);
if (server.isStopped()
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
return;
}
// Common case - system compaction without a file selection. Select now.
如果compaction==null表示是systemcompact非用户发起的compaction得到一个compactionContext
if (this.compaction == null) {
queuedPriority的值在此线程实例生成时默认是hbase.hstore.blockingStoreFiles配置的值减去storefile的个数
如果相减的值是1时返回2,否则返回相减的值
int oldPriority = this.queuedPriority;
重新拿到hbase.hstore.blockingStoreFiles配置的值减去storefile的个数的值,
this.queuedPriority = this.store.getCompactPriority();
如果这次拿到的值比上次的值要大,表示有storefile被删除(基本上是有compact完成)
if (this.queuedPriority > oldPriority) {
// Store priority decreased while we were in queue (due to some other compaction?),
// requeue with new priority to avoid blocking potential higher priorities.
结束本次线程调用,发起一个新的线程调用,用最新的priority
this.parent.execute(this);
return;
}
try {
通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile
并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,
表示系统发起的request,
如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER
那么priority的值为PRIORITY_USER+1
如果是client时发起的compact,此处会设置其request.priority为Store.PRIORITY_USER表示是用户发起的request
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
} catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem();
return;
}
if (this.compaction == null) return; // nothing to do
// Now see if we are in correct pool for the size; if not, go to the correct one.
// We might end up waiting for a while, so cancel the selection.
assertthis.compaction.hasSelection();
此处检查上面提到没用的地方:
compaction.getRequest().getSize()的大小为所有当此要做compact的storefile的总大小
检查是否大于hbase.regionserver.thread.compaction.throttle配置的值
此配置的默认值是hbase.hstore.compaction.max*2*memstoresize
如果大于指定的值,使用 largeCompactions,否则使用 smallCompactions
ThreadPoolExecutor pool = store.throttleCompaction(
compaction.getRequest().getSize()) ? largeCompactions : smallCompactions;
如果发现当前重新生成的执行线程池不是上次选择的线程池,结束compaction操作,
并重新通过新的线程池执行当前线程,结束当前线程的调用执行
if (this.parent != pool) {
this.store.cancelRequestedCompaction(this.compaction);
this.compaction = null;
this.parent = pool;
this.parent.execute(this);
return;
}
}
// Finally we can compact something.
assertthis.compaction != null;
this.compaction.getRequest().beforeExecute();
try {
// Note: please don't put single-compaction logic here;
// put it into region/store/etc. This is CST logic.
longstart = EnvironmentEdgeManager.currentTimeMillis();
调用HRegion.compact方法,此方法调用HStore.compact方法,把CompactionContext传入
此方法调用返回compact是否成功,如果成功返回true,否则返回false
booleancompleted = region.compact(compaction, store);
longnow = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
if (completed) {
检查此时的storefile个数是否还大于hbase.hstore.blockingStoreFiles配置的值,默认为7,
如要大于或等于此时返回的值为小于或等于0的值,表示还需要进行compact操作,重新再发起一次compact的request
// degenerate case: blocked regions require recursive enqueues
if (store.getCompactPriority() <= 0) {
requestSystemCompaction(region, store, "Recursive enqueue");
} else {
此时表示compact操作完成后,storefile的个数在配置的范围内,不需要在做compact,
检查是否需要split,如果需要发起split操作。
Split的发起条件:
a.splitlimit,hbase.regionserver.regionSplitLimit配置的值大于当前rs中的all onlineregions
默认为integer.maxvalue
b.a检查通过的同时hbase.hstore.blockingStoreFiles配置的值减去storefile的个数
大于等于Store.PRIORITY_USER(1)
c.非meta与namespace表,同时其它条件见split的分析部分
// see if the compaction has caused us to exceed max region size
requestSplit(region);
}
}
} catch (IOException ex) {
IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
LOG.error("Compaction failed " + this, remoteEx);
if (remoteEx != ex) {
LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
}
server.checkFileSystem();
} catch (Exception ex) {
LOG.error("Compaction failed " + this, ex);
server.checkFileSystem();
} finally {
LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
}
this.compaction.getRequest().afterExecute();
}
Hstore.compact方法流程:
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
assertcompaction != null && compaction.hasSelection();
CompactionRequest cr = compaction.getRequest();
得到要做compact的storefile列表
Collection<StoreFile> filesToCompact = cr.getFiles();
assert !filesToCompact.isEmpty();
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
}
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + fs.getTempDir() + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
longcompactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
List<StoreFile> sfs = null;
try {
执行compact操作,把所有的storefile全并成一个storefile,放入到store/.tmp目录下
通过DefaultCompactor.compact操作,把原有的所有storefile生成一个StoreFileScanner列表,
并生成一个StoreScanner把StoreFileScanner列表加入,
如果compact提升成了major,ScanType=COMPACT_DROP_DELETES,否则等于COMPACT_RETAIN_DELETES
针对compact的数据scan可参见后期分析的scan流程
// Commence the compaction.
List<Path> newFiles = compaction.compact();
如果hbase.hstore.compaction.complete 设置为false,检查storefile生成是否可用
// TODO: get rid of this!
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
LOG.warn("hbase.hstore.compaction.complete is set to false");
sfs = newArrayList<StoreFile>();
for (Path newFile : newFiles) {
// Create storefile around what we wrote with a reader on it.
StoreFile sf = createStoreFileAndReader(newFile);
sf.closeReader(true);
sfs.add(sf);
}
returnsfs;
}
把生成的新的storefile添加到cf的目录下。并返回生成后的storefile,此storefile已经生成好reader
// Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
生成一个compaction的说明信息,写入到wal日志中
writeCompactionWalRecord(filesToCompact, sfs);
把原有的storefile列表中store中的storefiles列表中移出,
并把新的storefile添加到storefiles列表中,对storefiles列表重新排序,通过storefile.seqid
storefiles列表是scan操作时对store中的查询用的storefile与reader
从HStore.filesCompacting列表中移出完成compact的storefiles列表
replaceStoreFiles(filesToCompact, sfs);
从hdfs中此store下移出compact完成的storefile文件列表。
// At this point the store will use new files for all new scanners.
completeCompaction(filesToCompact); // Archive old files & update store size.
} finally {
从HStore.filesCompacting列表中移出完成compact的storefiles列表,如果compact完成此时没有要移出的文件
如果compact失败,此时把没有compact的文件移出
finishCompactionRequest(cr);
}
logCompactionEndMessage(cr, sfs, compactionStartTime);
returnsfs;
}
major的compact处理流程
majorCompaction不管是直接传入sotre或者是region的传入,
如果传入的是region,那么会拿到region下的所有store,迭代调用每一个store的triggerMajorCompaction操作。
Hstore.triggerMajorCompaction操作流程:设置store中的forcemajor的值为true
public void triggerMajorCompaction() {
this.forceMajor = true;
}
设置完成forceMajor的值后,最终还是直接触发requestCompaction方法
if(family != null) {
compactSplitThread.requestCompaction(region, store, log,
Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log,
Store.PRIORITY_USER, null);
}
requestCompaction的处理流程大至与非major的coompact处理流程无区别:
CompactSplitThread.requestCompaction-->requestCompactionInternal-->selectCompaction
-->Hstore.requestCompaction(priority, request)得到compactionContext
代码细节如下所示:
是否是用户发起的compaction操作
booleanisUserCompaction = priority == Store.PRIORITY_USER;
以下代码返回为true的条件:
a.hbase.offpeak.start.hour的值不等于-1(0-23之间的值)
b.hbase.offpeak.end.hour的值不等-1(0-23之间的值),同时此值大于a配置的值
c.当前时间的小时部分在a与b配置的时间之间
否则返回的值为false
booleanmayUseOffPeak = offPeakHours.isOffPeakHour() &&
offPeakCompactionTracker.compareAndSet(false, true);
try {
此时最后一个参数为true(在没有其它的compact操作的情况下,同时指定的compact模式为major),
compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
} catch (IOException e) {
if (mayUseOffPeak) {
offPeakCompactionTracker.set(false);
}
throwe;
}
以上代码的中的compaction.select默认调用为DefaultStoreEngine.DefaultCompactionContext.select方法
publicbooleanselect(List<StoreFile> filesCompacting, booleanisUserCompaction,
booleanmayUseOffPeak, booleanforceMajor) throws IOException {
调用RatioBasedCompactionPolicy.selectCompaction得到一个CompactionRequest,
并把此request设置到当前compaction实例的request属性中
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
returnrequest != null;
}
RatioBasedCompactionPolicy.selectCompaction处理流程说明:
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
finalList<StoreFile> filesCompacting, finalbooleanisUserCompaction,
finalbooleanmayUseOffPeak, finalbooleanforceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = newArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
intfutureFiles = filesCompacting.isEmpty() ? 0 : 1;
此store下所有的storefile的个数减去当前已经在做compact的个数是否大于blockingfile的配置个数
blockingfile通过hbase.hstore.blockingStoreFiles配置,默认为7
booleanmayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
得到可选择的storefile,也就是得到所有的storefile中不包含正在做compact的sotrefile的列表
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
得到配置的ttl过期时间,通过在cf的表属性中配置TTL属性,
如果配置值为Integer.MAX_VALUE或者-1或者不配置,表示不控制ttl,
TTL属性生效的前提是MIN_VERSIONS属性不配置,TTL属性配置单位为秒
如果以上条件检查通过表示有配置ttl,返回ttl的配置时间,否则返回Long.maxvalue
longcfTtl = this.storeConfigInfo.getStoreFileTtl();
如果不是发起的major操作,
同时配置有ttl过期时间,同时hbase.store.delete.expired.storefile配置的值为true,默认为true,
同时ttl属性有配置,
得到当前未做compact操作的所有sotrefile中ttl过期的storefile,
如果有ttl过期的storefile文件,生成CompactionRequest实例,并结束此流程处理
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
returnnewCompactionRequest(expiredSelection);
}
}
如果非major把storefile中非reference(split后的文件为reference文件)的storefile文件,
同时storefile的大小超过了hbase.hstore.compaction.max.size配置的最大storefile文件大小限制
移出这些文件
candidateSelection = skipLargeFiles(candidateSelection);
}
// Force a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested
// as a major compaction.
// Or, if there are any references among the candidates.
此处检查major的条件包含以下几个:
(forceMajor && isUserCompaction)
a.如果是用户发起的compaction,同时用户发起的compaction是major的compact,
同时store中没有其它正在做compact的storefile,此值为true
((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
b.检查上面看到代码的3个条件,第一个(b1)与第二个(b2)为一个通过就行,第三个(b3)必须通过
forceMajor
b1.如果是发起的compaction,同时store中没有其它正在做compact的storefile
isMajorCompaction(candidateSelection)
b2.或者以下几个条件检查通过:
b2.1.可选的storefile列表中修改时间最老的一个storefile的时间达到了间隔的major compact时间
b2.2.如果可选的storefile列表中只有一个storefile,同时此storefile的最老的一条数据的时间已经达到ttl时间
同时此storefile的时间达到了间隔的major时间间隔
b2.3.如果可选的storefile列表中有多少storefile,同时更新时间最老的一个storefile达到了major的时间间隔
b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,
但是可选的storefile个数只有一个,同时此storefile已经做过major(StoreFile.majorCompaction==true)
同时ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做major compact
通过hbase.hregion.majorcompaction配置major的间隔时间,
通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差
如:major的配置时间为24小时,同时间隔的左右差是0.2f,那么default = 20% = +/- 4.8 hrs
(candidateSelection.size() < comConf.getMaxFilesToCompact())
b3.可选的storefile列表的个数小于compactmaxfiles的配置个数,
通过hbase.hstore.compaction.max配置,默认值为10
StoreUtils.hasReferences(candidateSelection)
c.如果storefile列表中包含有reference(split后的文件为reference文件)的storefile
booleanmajorCompaction = (
(forceMajor && isUserCompaction)
|| ((forceMajor || isMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()))
|| StoreUtils.hasReferences(candidateSelection)
);
如果是非major的compact
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
从可选的storefile列表中移出是bulk load的storefile
candidateSelection = filterBulk(candidateSelection);
如果可选的storefile列表中的个数大于或等于hbase.hstore.compaction.max配置的值,
移出可选的storefile列表中最大的几个storefile,
通过如下说明来计算什么文件算是较大的storefile:
a.storefile的文件大小是后面几个文件的总和的多少倍数,倍数的说明在如下几行中进行了说明,
1.通过hbase.offpeak.start.hour配置major的启动开始小时,如配置为1
2.通过hbase.offpeak.end.hour配置major的启动结束小时,如配置为2
如果启动时间是1与2配置的小时时间内,那么配置有这两个值后,
主要用来检查compact的文件的大小是否超过hbase.hstore.compaction.max配置的值,默认为10,
减去1个文件的总和的多少倍,
如:有10个待做compact的文件,第一个文件(i=0)的size是=i+max(10)-1=9,
以上表示第一个文件的size超过了后面9个文件总size的大小的多少倍,如果超过了倍数,不做compact
如果1与2配置为不等于-1,同时start小于end,当前做compact的时间刚好在此时间内,
多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默认为5.0f
否则通过hbase.hstore.compaction.ratio配置得到,默认为1.2f
b.storefile的大小必须是大于hbase.hstore.compaction.min.size配置的值,默认是memstore的大小
c.如果现在所有的storefile的个数减去正在做compact的storefile个数大于
通过hbase.hstore.blockingStoreFiles配置的值,默认为7,移出最大的几个storefile,
只保留通过hbase.hstore.compaction.min配置的个数,默认为3(配置不能小于2)
老版本通过hbase.hstore.compactionThreshold配置
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
检查可选的能做compact的文件个数是否达到最少文件要求,如果没有达到,清空所有可选的storefile列表值
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
如果不是用户发起的major的compact,移出可选的storefile列表中超过hbase.hstore.compaction.max配置的个数
candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
生成CompactionRequest实例
CompactionRequest result = newCompactionRequest(candidateSelection);
如果非major同时offpeak有配置,同时当前时间在配置的时间范围内,设置CompactionRequest的offpeak为true
表示当前时间是非高峰时间内
result.setOffPeak(!candidateSelection.isEmpty() && !majorCompaction && mayUseOffPeak);
returnresult;
}
执行compaction的具体处理,见非major的compaction处理流程中的执行compaction处理流程
flush时的compaction
flush时的compaction通过MemStoreFlusher.FlusherHander.run执行
当flushRegion完成后,会触发compact的执行
CompactSplitThread.requestSystemCompaction--> requestCompactionInternal(region)
public synchronized void requestSystemCompaction(
final HRegion r, final String why) throws IOException {
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
}
CompactSplitThread.requestCompactionInternal(Region)-->requestCompactionInternal(Store)
private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
intp, List<Pair<CompactionRequest, Store>> requests, booleanselectNow) throws IOException {
// not a special compaction request, so make our own list
List<CompactionRequest> ret = null;
if (requests == null) {
ret = selectNow ? newArrayList<CompactionRequest>(r.getStores().size()) : null;
for (Stores : r.getStores().values()) {
迭代发起针对store的compaction操作,传入的priority=Store.NO_PRIORITY,可参见非major的compact处理流程
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
if (selectNow) ret.add(cr);
}
} else {
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
ret = newArrayList<CompactionRequest>(requests.size());
for (Pair<CompactionRequest, Store> pair : requests) {
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
}
}
returnret;
}
定时线程执行的compact流程
定期线程执行通过HRegionServer.CompactionChecker执行,
CompactionChecker线程主要作用:
生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否需要compact的检查线程,
如果需要进行compact,会在此处通过compact的线程触发compcat的请求
此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置major compact的优先级,
如果major compact的优先级大过此值,把compact的优先级设置为此值.
Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间需要进行compact检查的间隔
默认为1000ms,
compactionChecker的检查周期为wakefrequency*multiplier ms,
也就是默认情况下线程调用1000次执行一次compact检查
a.compaction检查时发起compact的条件是
如果一个store中所有的file个数减去在做(或发起compact请求)的个数,大于或等于
hbase.hstore.compaction.min配置的值,
老版本使用hbase.hstore.compactionThreshold进行配置,默认值为3
b.major compact的条件检查
通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24
通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默认为0.2,
也就是major的时间上下浮动4.8小时
b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示需要major,
b2.1>store下是否只有一个文件,同时这个文件已经到了major的时间,
b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间需要major,否则不做
b2.2>文件个数大于1,到达major的时间,需要major
protected void chore() {
for (HRegion r : this.instance.onlineRegions.values()) {
if (r == null)
continue;
for (Stores : r.getStores().values()) {
try {
longmultiplier = s.getCompactionCheckMultiplier();
assertmultiplier > 0;
if (iteration % multiplier != 0) continue;
检查是否需要system的compact,当前所有的storefile个数减去正在做compact的storefile个数,
大于或等于hbase.hstore.compaction.min配置的值,表示需要compact
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
发起系统的compact操作,见flush时的coompaction
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
-
" requests compaction");
b2.或者以下几个条件检查通过:
b2.1.可选的storefile列表中修改时间最老的一个storefile的时间达到了间隔的major compact时间
b2.2.如果可选的storefile列表中只有一个storefile,同时此storefile的最老的一条数据的时间已经达到ttl时间
同时此storefile的时间达到了间隔的major时间间隔
b2.3.如果可选的storefile列表中有多少storefile,同时更新时间最老的一个storefile达到了major的时间间隔
b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,
但是可选的storefile个数只有一个,同时此storefile已经做过major(StoreFile.majorCompaction==true)
同时ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做major compact
通过hbase.hregion.majorcompaction配置major的间隔时间,
通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差
如:major的配置时间为24小时,同时间隔的左右差是0.2f,那么default = 20% = +/- 4.8 hrs
} elseif (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
发起requestCompaction操作,见下面说明A
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null);
} else {
发起requestCompaction操作,见下面说明B
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null);
}
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}
}
说明A:
CompactSplitThread.requestCompaction-->
requestCompaction(r, s, why, Store.NO_PRIORITY, request);
-->requestCompactionInternal(r, s, why, priority, request, true);此时设置selectNow为true
说明B:
CompactSplitThread.requestCompaction-->
requestCompactionInternal(r, s, why, priority, request, true);此时设置selectNow为true
-------------------------------------------------------------
requestCompactionInternal处理流程:
private synchronized CompactionRequest requestCompactionInternal(final HRegion r,
final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
针对store的compaction request处理流程
如果要对一个HBASE的表禁用掉compaction操作,可以通过create table时配置COMPACTION_ENABLED属性
private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
final String why, intpriority, CompactionRequest request, booleanselectNow)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
returnnull;
}
CompactionContextcompaction = null;
此时的调用selectNow为true,(如果是系统调用,此时的selectNow为false,)
也就是在发起request到CompactSplitThread.CompactionRunner线程执行时,
如果是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例
if (selectNow) {
通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile
并设置其request.priority为Store.PRIORITY_USER表示用户发起的request
如果是flush时发起的compact,
并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,
表示系统发起的request,
如果hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER
那么priority的值为PRIORITY_USER+1
compaction = selectCompaction(r, s, priority, request);
if (compaction == null) returnnull; // message logged inside
}
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
longsize = selectNow ? compaction.getRequest().getSize() : 0;
此时好像一直就得不到largeCompactions的实例,因为selectNow==false时,size的大小为0
不可能大于hbase.regionserver.thread.compaction.throttle配置的值
此配置的默认值是hbase.hstore.compaction.max*2*memstoresize
ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
? largeCompactions : smallCompactions;
通过smallCompactions的线程池生成CompactionRunner线程并执行,见执行Compaction的处理线程
pool.execute(newCompactionRunner(s, r, compaction, pool));
if (LOG.isDebugEnabled()) {
String type = (pool == smallCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
}
returnselectNow ? compaction.getRequest() : null;
}
相关推荐
"COMPACT.2.0.zip聚类分析工具包"是一个专为进行聚类分析而设计的MATLAB软件包。这个工具包集成了多种聚类算法的实现,旨在帮助用户理解和比较不同聚类方法的性能。聚类分析是数据挖掘中的一个重要分支,其目标是...
在本项目中,可能使用LabVIEW的File I/O函数将采集到的数据写入文件,以便后续的分析和处理。 总的来说,"Compact RIO N9222数据采集及保存Vi"项目展示了如何利用LabVIEW和N9222模块进行高效、精准的数据采集和存储...
在实际应用中,cFP广泛应用于过程控制、测试测量、环境监控等领域。例如,在制造业中,cFP可以用于生产线的自动化控制,实时监测和调整生产参数;在科研实验室,它可以作为数据采集系统,进行精确的实验数据记录和...
这款设备专为工业自动化领域的控制和监控任务设计,旨在提供直观、高效的用户交互界面,帮助操作人员更好地理解和控制生产过程。 在描述中提到的"ABB compact HMI800人机操作系统",是指该设备运行的操作系统,它是...
- 数据记录与分析:可以记录生产数据,帮助优化生产过程,进行故障诊断和预防性维护。 - 用户权限管理:支持多级用户权限设置,确保只有授权人员能访问和修改系统设置。 - 图形化编程:使用内置的图形化编程工具...
总之,西门子Compact L C臂机的故障分析与维修需要深入理解设备的硬件结构、工作原理以及软件控制系统。通过精确的错误代码解析和系统性检查,可以有效地定位和解决设备问题,保障医疗设备的正常运行,从而更好地...
然而,PLC的处理能力和存储空间有限,用户界面和编程环境相对传统,特别是在需要进行高级数据分析和图形界面交互时,PLC通常难以胜任。 随着这种需求的出现,业界出现了新的产品,比如美国国家仪器(NI)公司推出的...
5. **易用性**:HTML Compact v2.6的操作流程简单,只需几步就能完成整个压缩过程,即便是初级用户也能快速掌握。 6. **文件保护**:在压缩过程中,软件会保留原始文件,避免误操作导致的损失。用户可以随时对比...
这一过程不仅涵盖了Lex和Yacc的基本应用,还涉及到更复杂的编程技巧,如嵌入式动作、继承属性和错误处理。 ### 更多Lex与Yacc的高级话题 - **字符串处理与保留字识别**:讨论了如何在Lex中处理字符串匹配和如何...
根据提供的文件信息,文章标题为"Learning Compact Geometric Features (ICCV 2017).pdf",内容涉及深度学习和特征提取,特别是在点云数据上的应用。文章介绍了如何从数据中学习能够代表点云中单个点的局部几何特征...
本书可能涵盖的内容包括.NET Compact Framework的架构、开发流程、性能调优、数据库交互、UI设计原则以及实际案例分析。通过学习此书,开发者将能深入了解如何在资源有限的环境下构建高效、安全的.NET应用程序,为...
10. **技术支持**:使用Safenet的ProteQ Compact 500 Dongle SDK开发的软件项目,通常可以享受到Safenet提供的技术支持,帮助开发者解决在开发过程中遇到的问题。 总之,ProteQ Compact 500 Dongle SDK是针对...
通过运行和分析这个项目,我们可以深入理解遗传算法如何处理旅行商问题,观察算法在不同参数设置下的表现,比如种群大小、交叉概率、变异概率等,进一步探索如何优化算法性能。此外,此项目也可以作为基础,扩展到...
`calc1.y`、`calc2.y` 和 `calc3.y` 是不同阶段的语法分析器源代码,展示了如何构建和优化解析过程,比如处理算术表达式的计算规则,可能包括运算符优先级、左递归处理等。 3. **编译器构造**: 这些源代码展示了...
在图像处理领域,CVI(Compact Vision Interface)是一种强大的开发平台,主要用于构建视觉应用系统。这个数字图像处理系统是基于CVI技术实现的,具备多种功能,如图像显示、边缘检测、灰度化以及图像的放大缩小等。...
SQL CE查询分析器,全称为SQL Server Compact Edition Query Analyzer,是一款用于处理SQL Server Compact Edition数据库的工具。SQL Server Compact Edition(SQL CE)是微软提供的一款轻量级、嵌入式的关系型...
- **趋势分析**:帮助用户理解如何利用这些数据来跟踪自己的血糖变化趋势。 #### 四、维护与保养 - **日常维护**:提供关于如何保持血糖仪性能良好的建议。 - **故障排查**:列出可能遇到的技术问题及其解决办法。 ...
### 知识点详解 #### 一、引言与文档结构 **1.1 工业机器人文献** ...通过这份详尽的指南,用户不仅可以深入了解KR C4 compact控制柜备选接口的相关信息,还能获得实际安装与操作过程中的宝贵指导。