- 浏览: 27379 次
- 性别:
- 来自: 北京
最新评论
-
pinkmoon:
HBase 0.96配置 snappy(绝对有效哦亲) -
pinkmoon:
记一次痛苦的 hadoop 2编译过程 -
半点玻璃心:
dsx1013 写道你好,我有snappy 源码安装,没有指定 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
dsx1013:
你好,我有snappy 源码安装,没有指定安装目录,默认安装路 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
半点玻璃心:
推文7 写道你好,我也遇到了这个问题,能否麻烦把您编译的had ...
HBase 0.96配置 snappy(绝对有效哦亲)
在写请求(put,delete)到达服务端时,服务端(HRegionServer)会将请求按 Region 聚合,并交给具体的 Region 实例进行处理。Region 收到请求后,会剥离 append 请求和 increase 请求单独处理,然后将 put 和 delete 揉一起按批处理。处理之前,会检查整个memstore 的大小。
wakeupFlushThread方法很简单,就是往 flush 请求队列里面写入一个对象,唤醒 flush 线程。看看 flush 线程唤醒后,都干了些什么
flushOneForGlobalPressure会从所有的 memstore 中选出两个 region,一个是storefile 数量没有超过熟练限制(默认7个)的 region 中 memstore 最大的,假定为 A,一个是所有 region 中 memstore 最大的,假定为 B。如果B 的 memstore > 2*A.memstore,则 flushB,否则 flush A。
protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, final List<ClientProtos.Action> mutations, final CellScanner cells) { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); boolean batchContainsPuts = false, batchContainsDelete = false; try { int i = 0; for (ClientProtos.Action action : mutations) { MutationProto m = action.getMutation(); Mutation mutation; if (m.getMutateType() == MutationType.PUT) { mutation = ProtobufUtil.toPut(m, cells); batchContainsPuts = true; } else { mutation = ProtobufUtil.toDelete(m, cells); batchContainsDelete = true; } mArray[i++] = mutation; } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory();//这里检查 memstore 的大小,并决定是否flush } OperationStatus codes[] = region.batchMutate(mArray, false); for (i = 0; i < codes.length; i++) { int index = mutations.get(i).getIndex(); Exception e = null; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); builder.addResultOrException(getResultOrException(e, index)); break; case SANITY_CHECK_FAILURE: e = new FailedSanityCheckException(codes[i].getExceptionMsg()); builder.addResultOrException(getResultOrException(e, index)); break; default: e = new DoNotRetryIOException(codes[i].getExceptionMsg()); builder.addResultOrException(getResultOrException(e, index)); break; case SUCCESS: builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); break; } } } catch (IOException ie) { for (int i = 0; i < mutations.size(); i++) { builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); } } long after = EnvironmentEdgeManager.currentTimeMillis(); if (batchContainsPuts) { metricsRegionServer.updatePut(after - before); } if (batchContainsDelete) { metricsRegionServer.updateDelete(after - before); } }
public void reclaimMemStoreMemory() { TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); if (isAboveHighWaterMark()) {//如果大于上限,则直接阻塞。 if (Trace.isTracing()) { scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); } long start = System.currentTimeMillis(); synchronized (this.blockSignal) {//memstoreFlusher 是进程单例的,所以所有此时该 RS 上的写请求都会 block 在这里 boolean blocked = false; long startTime = 0; while (isAboveHighWaterMark() && !server.isStopped()) { if (!blocked) { startTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + " is >= than blocking " + StringUtils.humanReadableInt(globalMemStoreLimit) + " size"); } blocked = true; wakeupFlushThread(); try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } long took = System.currentTimeMillis() - start; LOG.warn("Memstore is above high water mark and block " + took + "ms"); } if (blocked) { final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; if (totalTime > 0) { this.updatesBlockedMsHighWater.add(totalTime); } LOG.info("Unblocking updates for server " + server.toString()); } } } else if (isAboveLowWaterMark()) { wakeupFlushThread();//如果是大于 lowerlimit,则只启动 flush 线程,并不阻塞请求 } scope.close(); }
wakeupFlushThread方法很简单,就是往 flush 请求队列里面写入一个对象,唤醒 flush 线程。看看 flush 线程唤醒后,都干了些什么
private class FlushHandler extends HasThread { @Override public void run() { while (!server.isStopped()) { FlushQueueEntry fqe = null; try { wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (fqe == null || fqe instanceof WakeupFlushThread) {//超时后如果没有 flush 请求,或者 flush 请求是 wakeup 请求,这里显然是后者 if (isAboveLowWaterMark()) {//如果超过 lowerlimit,则开始 flush LOG.debug("Flush thread woke up because memory above low water=" + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); if (!flushOneForGlobalPressure()) {//这里开始执行 flush,并返回执行结果。这里的 flush 操作是挑选一个最值得 flush region 执行。并不会 flush 整个 rs 的所有memstore,具体执行往后看 Thread.sleep(1000); wakeUpIfBlocking(); } // Enqueue another one of these tokens so we'll wake up again wakeupFlushThread();//自我唤醒。因为每次执行只 flush 一个 region,如果你的 region 很多,可能 flush 其中一个并不能解除你的内存报警,需要自己唤醒,以再次检查是否需要继续 flush } continue; } FlushRegionEntry fre = (FlushRegionEntry) fqe;//如果不是被WakeupFlushThread唤醒的,那么就是特定的 region flush 请求,直接 flush 指定的 region if (!flushRegion(fre)) { break; } } catch (InterruptedException ex) { continue; } catch (ConcurrentModificationException ex) { continue; } catch (Exception ex) { LOG.error("Cache flusher failed for entry " + fqe, ex); if (!server.checkFileSystem()) { break; } } } synchronized (regionsInQueue) { regionsInQueue.clear(); flushQueue.clear(); } // Signal anyone waiting, so they see the close flag wakeUpIfBlocking(); LOG.info(getName() + " exiting"); } }
flushOneForGlobalPressure会从所有的 memstore 中选出两个 region,一个是storefile 数量没有超过熟练限制(默认7个)的 region 中 memstore 最大的,假定为 A,一个是所有 region 中 memstore 最大的,假定为 B。如果B 的 memstore > 2*A.memstore,则 flushB,否则 flush A。
private boolean flushOneForGlobalPressure() { SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); Set<HRegion> excludedRegions = new HashSet<HRegion>(); boolean flushedOne = false; while (!flushedOne) {//一旦成功 flush 某个 region,就 say 88 // Find the biggest region that doesn't have too many storefiles // (might be null!) HRegion bestFlushableRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. HRegion bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); if (bestAnyRegion == null) { LOG.error("Above memory mark but there are no flushable regions!"); return false; } HRegion regionToFlush; if (bestFlushableRegion != null && bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { if (LOG.isDebugEnabled()) { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; } else { if (bestFlushableRegion == null) { regionToFlush = bestAnyRegion; } else { regionToFlush = bestFlushableRegion; } } Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); flushedOne = flushRegion(regionToFlush, true); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); excludedRegions.add(regionToFlush);//如果当前的 flush 失败,则把该 region 加入排除列表 } } return true; }
发表评论
-
hbase MemStoreLAB代码浅析-1
2014-09-30 17:21 1155本文基于 hbase 0.98x,如果发现源码与你的副本不符合 ... -
HBase 0.96 服务端写流程代码阅读笔记
2014-02-24 15:36 0private long doMiniBatchMutati ... -
HBase 0.96配置 snappy(绝对有效哦亲)
2014-02-12 14:10 4064通常情况下,snappy压缩算法无非是hbase 最好的伴侣, ... -
HBase Memstore flush代码阅读笔记-2 -由 XXX 触发的 flush
2014-01-22 18:44 0之前看到在执行 mutate 操作之前,RS 会检查 mems ... -
HBase Memstore flush代码阅读笔记-2-由单个 memstore大小超过限制触发的 flush
2014-01-23 15:15 3564本代码基于0.96.1.1:http://svn.apache ... -
HBase Memstore配置
2014-01-21 11:09 491... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.96-hadoop2)
2013-11-11 19:13 883看看MultiServerCallable的核心方法,call ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)
2013-11-08 19:54 2052按照94的阅读进度,这里该看如何定位RS和Region了 先回 ... -
HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)
2013-11-08 19:23 4493又回来了,还是看put,不过版本号变了,希望看0.94的童靴移 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
2013-11-08 12:55 163上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)
2013-11-07 19:44 1092终于把RS的定位问题搞清楚了些些,时间不等人,马上看看conn ... -
HBase Memstore配置
2013-11-07 15:47 2921HBase Memstore配置 本文为翻译,原英文地址:ht ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
2013-11-07 17:01 193上一篇http://dennis-lee-gammy.itey ... -
HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
2013-11-06 19:57 342最近闲来无事看看hbase ...
相关推荐
hbase-hbck2-1.1.0-SNAPSHOT.jar
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
hbase hbck2修复工具hbase-operator-tools-1.0.0.1.0.0.0-618-bin.tar.gz,hbase1版本的hbck已经不支持修复命令,hbase2.1版本需要用这个新版的工具
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除; HBCK2已经被剥离出HBase成为了一个单独的项目,如果你想要使用这个工具,需要根据自己HBase的版本,编译源码。其GitHub地址...
赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...
赠送jar包:hbase-hadoop2-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
《深入理解HBase:以hbase-0.98.7-hadoop2-bin.tar为例》 HBase,作为Apache软件基金会的重要项目之一,是构建在Hadoop生态系统之上的一款分布式、高性能、列式存储的NoSQL数据库。它为大规模数据集提供了实时读写...
赠送jar包:hbase-prefix-tree-1.1.3.jar; 赠送原API文档:hbase-prefix-tree-1.1.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; ...
hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...
赠送jar包:hbase-annotations-1.1.2.jar; 赠送原API文档:hbase-annotations-1.1.2-javadoc.jar; 赠送源代码:hbase-annotations-1.1.2-sources.jar; 包含翻译后的API文档:hbase-annotations-1.1.2-javadoc-...
赠送jar包:flink-hbase_2.11-1.10.0.jar; 赠送原API文档:flink-hbase_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-hbase_2.11-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。
赠送jar包:hbase-client-1.4.3.jar; 赠送原API文档:hbase-client-1.4.3-javadoc.jar; 赠送源代码:hbase-client-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-client-1.4.3.pom; 包含翻译后的API文档:...
总的来说,`phoenix-hbase-2.2-5.1.2-bin.tar.gz`是大数据存储和分析领域的重要工具,它结合了HBase的分布式存储能力和Phoenix的SQL查询能力,为大数据处理提供了一种高效、易用的解决方案。无论是对大数据新手还是...
HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs....③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`
赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....
赠送jar包:hbase-metrics-api-1.4.3.jar; 赠送原API文档:hbase-metrics-api-1.4.3-javadoc.jar; 赠送源代码:hbase-metrics-api-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-metrics-api-1.4.3.pom; ...