- 浏览: 27560 次
- 性别:
- 来自: 北京
-
最新评论
-
pinkmoon:
HBase 0.96配置 snappy(绝对有效哦亲) -
pinkmoon:
记一次痛苦的 hadoop 2编译过程 -
半点玻璃心:
dsx1013 写道你好,我有snappy 源码安装,没有指定 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
dsx1013:
你好,我有snappy 源码安装,没有指定安装目录,默认安装路 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
半点玻璃心:
推文7 写道你好,我也遇到了这个问题,能否麻烦把您编译的had ...
HBase 0.96配置 snappy(绝对有效哦亲)
在写请求(put,delete)到达服务端时,服务端(HRegionServer)会将请求按 Region 聚合,并交给具体的 Region 实例进行处理。Region 收到请求后,会剥离 append 请求和 increase 请求单独处理,然后将 put 和 delete 揉一起按批处理。处理之前,会检查整个memstore 的大小。
wakeupFlushThread方法很简单,就是往 flush 请求队列里面写入一个对象,唤醒 flush 线程。看看 flush 线程唤醒后,都干了些什么
flushOneForGlobalPressure会从所有的 memstore 中选出两个 region,一个是storefile 数量没有超过熟练限制(默认7个)的 region 中 memstore 最大的,假定为 A,一个是所有 region 中 memstore 最大的,假定为 B。如果B 的 memstore > 2*A.memstore,则 flushB,否则 flush A。
protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, final List<ClientProtos.Action> mutations, final CellScanner cells) { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { int i = 0; for (ClientProtos.Action action : mutations) { MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { mutation = ProtobufUtil.toPut(m, cells); batchContainsPuts = true; } else { mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } mArray[i++] = mutation; } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory();//这里检查 memstore 的大小,并决定是否flush } OperationStatus codes[] = region.batchMutate(mArray, false); for (i = 0; i < codes.length; i++) { int index = mutations.get(i).getIndex(); Exception e = null; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); builder.addResultOrException(getResultOrException(e, index)); break; case SANITY_CHECK_FAILURE: e = new FailedSanityCheckException(codes[i].getExceptionMsg()); builder.addResultOrException(getResultOrException(e, index)); break; default: e = new DoNotRetryIOException(codes[i].getExceptionMsg()); builder.addResultOrException(getResultOrException(e, index)); break; case SUCCESS: builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); break; } } } catch (IOException ie) { for (int i = 0; i < mutations.size(); i++) { builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); } } long after = EnvironmentEdgeManager.currentTimeMillis(); if (batchContainsPuts) { metricsRegionServer.updatePut(after - before); } if (batchContainsDelete) { metricsRegionServer.updateDelete(after - before); } }
public void reclaimMemStoreMemory() { TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); if (isAboveHighWaterMark()) {//如果大于上限,则直接阻塞。 if (Trace.isTracing()) { scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); } long start = System.currentTimeMillis(); synchronized (this.blockSignal) {//memstoreFlusher 是进程单例的,所以所有此时该 RS 上的写请求都会 block 在这里 boolean blocked = false; long startTime = 0; while (isAboveHighWaterMark() && !server.isStopped()) { if (!blocked) { startTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + " is >= than blocking " + StringUtils.humanReadableInt(globalMemStoreLimit) + " size"); } blocked = true; wakeupFlushThread(); try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } long took = System.currentTimeMillis() - start; LOG.warn("Memstore is above high water mark and block " + took + "ms"); } if (blocked) { final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; if (totalTime > 0) { this.updatesBlockedMsHighWater.add(totalTime); } LOG.info("Unblocking updates for server " + server.toString()); } } } else if (isAboveLowWaterMark()) { wakeupFlushThread();//如果是大于 lowerlimit,则只启动 flush 线程,并不阻塞请求 } scope.close(); }
wakeupFlushThread方法很简单,就是往 flush 请求队列里面写入一个对象,唤醒 flush 线程。看看 flush 线程唤醒后,都干了些什么
private class FlushHandler extends HasThread { @Override public void run() { while (!server.isStopped()) { FlushQueueEntry fqe = null; try { wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (fqe == null || fqe instanceof WakeupFlushThread) {//超时后如果没有 flush 请求,或者 flush 请求是 wakeup 请求,这里显然是后者 if (isAboveLowWaterMark()) {//如果超过 lowerlimit,则开始 flush LOG.debug("Flush thread woke up because memory above low water=" + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); if (!flushOneForGlobalPressure()) {//这里开始执行 flush,并返回执行结果。这里的 flush 操作是挑选一个最值得 flush region 执行。并不会 flush 整个 rs 的所有memstore,具体执行往后看 Thread.sleep(1000); wakeUpIfBlocking(); } // Enqueue another one of these tokens so we'll wake up again wakeupFlushThread();//自我唤醒。因为每次执行只 flush 一个 region,如果你的 region 很多,可能 flush 其中一个并不能解除你的内存报警,需要自己唤醒,以再次检查是否需要继续 flush } continue; } FlushRegionEntry fre = (FlushRegionEntry) fqe;//如果不是被WakeupFlushThread唤醒的,那么就是特定的 region flush 请求,直接 flush 指定的 region if (!flushRegion(fre)) { break; } } catch (InterruptedException ex) { continue; } catch (ConcurrentModificationException ex) { continue; } catch (Exception ex) { LOG.error("Cache flusher failed for entry " + fqe, ex); if (!server.checkFileSystem()) { break; } } } synchronized (regionsInQueue) { regionsInQueue.clear(); flushQueue.clear(); } // Signal anyone waiting, so they see the close flag wakeUpIfBlocking(); LOG.info(getName() + " exiting"); } }
flushOneForGlobalPressure会从所有的 memstore 中选出两个 region,一个是storefile 数量没有超过熟练限制(默认7个)的 region 中 memstore 最大的,假定为 A,一个是所有 region 中 memstore 最大的,假定为 B。如果B 的 memstore > 2*A.memstore,则 flushB,否则 flush A。
private boolean flushOneForGlobalPressure() { SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); Set<HRegion> excludedRegions = new HashSet<HRegion>(); boolean flushedOne = false; while (!flushedOne) {//一旦成功 flush 某个 region,就 say 88 // Find the biggest region that doesn't have too many storefiles // (might be null!) HRegion bestFlushableRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. HRegion bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); if (bestAnyRegion == null) { LOG.error("Above memory mark but there are no flushable regions!"); return false; } HRegion regionToFlush; if (bestFlushableRegion != null && bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { if (LOG.isDebugEnabled()) { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; } else { if (bestFlushableRegion == null) { regionToFlush = bestAnyRegion; } else { regionToFlush = bestFlushableRegion; } } Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); flushedOne = flushRegion(regionToFlush, true); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); excludedRegions.add(regionToFlush);//如果当前的 flush 失败,则把该 region 加入排除列表 } } return true; }
发表评论
-
hbase MemStoreLAB代码浅析-1
2014-09-30 17:21 1174本文基于 hbase 0.98x,如果发现源码与你的副本不符合 ... -
HBase 0.96 服务端写流程代码阅读笔记
2014-02-24 15:36 0private long doMiniBatchMutati ... -
HBase 0.96配置 snappy(绝对有效哦亲)
2014-02-12 14:10 4078通常情况下,snappy压缩算法无非是hbase 最好的伴侣, ... -
HBase Memstore flush代码阅读笔记-2 -由 XXX 触发的 flush
2014-01-22 18:44 0之前看到在执行 mutate 操作之前,RS 会检查 mems ... -
HBase Memstore flush代码阅读笔记-2-由单个 memstore大小超过限制触发的 flush
2014-01-23 15:15 3574本代码基于0.96.1.1:http://svn.apache ... -
HBase Memstore配置
2014-01-21 11:09 503... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.96-hadoop2)
2013-11-11 19:13 891看看MultiServerCallable的核心方法,call ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)
2013-11-08 19:54 2064按照94的阅读进度,这里该看如何定位RS和Region了 先回 ... -
HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)
2013-11-08 19:23 4501又回来了,还是看put,不过版本号变了,希望看0.94的童靴移 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
2013-11-08 12:55 163上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)
2013-11-07 19:44 1103终于把RS的定位问题搞清楚了些些,时间不等人,马上看看conn ... -
HBase Memstore配置
2013-11-07 15:47 2937HBase Memstore配置 本文为翻译,原英文地址:ht ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
2013-11-07 17:01 193上一篇http://dennis-lee-gammy.itey ... -
HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
2013-11-06 19:57 342最近闲来无事看看hbase ...
相关推荐
3. **内存管理**:`hbase.regionserver.global.memstore.upperLimit`和`hbase.regionserver.global.memstore.lowerLimit`分别设置了全局MemStore占用RegionServer内存的最大和最小比例,以防止数据溢出到磁盘。...
`hbase.regionserver.global.memstore.upperLimit` 和 `hbase.regionserver.global.memstore.lowerLimit` - **含义**:分别定义了RegionServer所有MemStore占用内存的比例上限和下限。 - **默认值**:分别为0.4和...
7. `hbase.regionserver.global.memstore.lowerLimit` 和 `hbase.regionserver.global.memstore.upperLimit`: 控制MemStore占用区域服务器总内存的比例,防止内存溢出。 8. `hbase.hregion.majorcompaction.interval...
`hbase.regionserver.global.memstore.upperLimit`和`hbase.regionserver.global.memstore.lowerLimit`分别设置了全局MemStore的最大和最小占用比例,防止内存溢出。`hbase.hregion.memstore.flush.size`定义了触发...
资源内项目源码是来自个人的毕业设计,代码都测试ok,包含源码、数据集、可视化页面和部署说明,可产生核心指标曲线图、混淆矩阵、F1分数曲线、精确率-召回率曲线、验证集预测结果、标签分布图。都是运行成功后才上传资源,毕设答辩评审绝对信服的保底85分以上,放心下载使用,拿来就能用。包含源码、数据集、可视化页面和部署说明一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.txt文件,仅供学习参考, 切勿用于商业用途。
wrf转mp4播放器1.1.1
内容概要:本文档详细介绍了如何在Simulink中设计一个满足特定规格的音频带ADC(模数转换器)。首先选择了三阶单环多位量化Σ-Δ调制器作为设计方案,因为这种结构能在音频带宽内提供高噪声整形效果,并且多位量化可以降低量化噪声。接着,文档展示了具体的Simulink建模步骤,包括创建模型、添加各个组件如积分器、量化器、DAC反馈以及连接它们。此外,还进行了参数设计与计算,特别是过采样率和信噪比的估算,并引入了动态元件匹配技术来减少DAC的非线性误差。性能验证部分则通过理想和非理想的仿真实验评估了系统的稳定性和各项指标,最终证明所设计的ADC能够达到预期的技术标准。 适用人群:电子工程专业学生、从事数据转换器研究或开发的技术人员。 使用场景及目标:适用于希望深入了解Σ-Δ调制器的工作原理及其在音频带ADC应用中的具体实现方法的人群。目标是掌握如何利用MATLAB/Simulink工具进行复杂电路的设计与仿真。 其他说明:文中提供了详细的Matlab代码片段用于指导读者完成整个设计流程,同时附带了一些辅助函数帮助分析仿真结果。
国网台区终端最新规范
《基于YOLOv8的智慧农业水肥一体化控制系统》(包含源码、可视化界面、完整数据集、部署教程)简单部署即可运行。功能完善、操作简单,适合毕设或课程设计
GSDML-V2.33-LEUZE-AMS3048i-20170622.xml
微信小程序项目课程设计,包含LW+ppt
微信小程序项目课程设计,包含LW+ppt
终端运行进度条脚本
幼儿园预防肺结核教育培训课件资料
python,python相关资源
《基于YOLOv8的智慧校园电动车充电桩状态监测系统》(包含源码、可视化界面、完整数据集、部署教程)简单部署即可运行。功能完善、操作简单,适合毕设或课程设计
deepseek 临床之理性软肋.pdf
SM2258XT量产工具(包含16种程序),固态硬盘量产工具使用
RecyclerView.zip
水务大脑让水务运营更智能(23页)