`
半点玻璃心
  • 浏览: 27302 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBase Memstore flush代码阅读笔记-1 -由 lowerlimit 和 upperlimit 触发的 flush

 
阅读更多
在写请求(put,delete)到达服务端时,服务端(HRegionServer)会将请求按 Region 聚合,并交给具体的 Region 实例进行处理。Region 收到请求后,会剥离 append 请求和 increase 请求单独处理,然后将 put 和 delete 揉一起按批处理。处理之前,会检查整个memstore 的大小。
protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
                             final List<ClientProtos.Action> mutations, final CellScanner cells) {
        Mutation[] mArray = new Mutation[mutations.size()];
        long before = EnvironmentEdgeManager.currentTimeMillis();
        boolean batchContainsPuts = false, batchContainsDelete = false;
        try {
            int i = 0;
            for (ClientProtos.Action action : mutations) {
                MutationProto m = action.getMutation();
                Mutation mutation;
                if (m.getMutateType() == MutationType.PUT) {
                    mutation = ProtobufUtil.toPut(m, cells);
                    batchContainsPuts = true;
                } else {
                    mutation = ProtobufUtil.toDelete(m, cells);
                    batchContainsDelete = true;
                }
                mArray[i++] = mutation;
            }

            requestCount.add(mutations.size());
            if (!region.getRegionInfo().isMetaTable()) {
                cacheFlusher.reclaimMemStoreMemory();//这里检查 memstore 的大小,并决定是否flush
            }

            OperationStatus codes[] = region.batchMutate(mArray, false);
            for (i = 0; i < codes.length; i++) {
                int index = mutations.get(i).getIndex();
                Exception e = null;
                switch (codes[i].getOperationStatusCode()) {
                    case BAD_FAMILY:
                        e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
                        builder.addResultOrException(getResultOrException(e, index));
                        break;

                    case SANITY_CHECK_FAILURE:
                        e = new FailedSanityCheckException(codes[i].getExceptionMsg());
                        builder.addResultOrException(getResultOrException(e, index));
                        break;

                    default:
                        e = new DoNotRetryIOException(codes[i].getExceptionMsg());
                        builder.addResultOrException(getResultOrException(e, index));
                        break;

                    case SUCCESS:
                        builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
                        break;
                }
            }
        } catch (IOException ie) {
            for (int i = 0; i < mutations.size(); i++) {
                builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
            }
        }
        long after = EnvironmentEdgeManager.currentTimeMillis();
        if (batchContainsPuts) {
            metricsRegionServer.updatePut(after - before);
        }
        if (batchContainsDelete) {
            metricsRegionServer.updateDelete(after - before);
        }
    }


public void reclaimMemStoreMemory() {
        TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
        if (isAboveHighWaterMark()) {//如果大于上限,则直接阻塞。
            if (Trace.isTracing()) {
                scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
            }
            long start = System.currentTimeMillis();
            synchronized (this.blockSignal) {//memstoreFlusher 是进程单例的,所以所有此时该 RS 上的写请求都会 block 在这里
                boolean blocked = false;
                long startTime = 0;
                while (isAboveHighWaterMark() && !server.isStopped()) {
                    if (!blocked) {
                        startTime = EnvironmentEdgeManager.currentTimeMillis();
                        LOG.info("Blocking updates on " + server.toString() +
                                ": the global memstore size " +
                                StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
                                " is >= than blocking " +
                                StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
                    }
                    blocked = true;
                    wakeupFlushThread();
                    try {
                        // we should be able to wait forever, but we've seen a bug where
                        // we miss a notify, so put a 5 second bound on it at least.
                        blockSignal.wait(5 * 1000);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                    long took = System.currentTimeMillis() - start;
                    LOG.warn("Memstore is above high water mark and block " + took + "ms");
                }
                if (blocked) {
                    final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
                    if (totalTime > 0) {
                        this.updatesBlockedMsHighWater.add(totalTime);
                    }
                    LOG.info("Unblocking updates for server " + server.toString());
                }
            }
        } else if (isAboveLowWaterMark()) {
            wakeupFlushThread();//如果是大于 lowerlimit,则只启动 flush 线程,并不阻塞请求
        }
        scope.close();
    }


wakeupFlushThread方法很简单,就是往 flush 请求队列里面写入一个对象,唤醒 flush 线程。看看 flush 线程唤醒后,都干了些什么

private class FlushHandler extends HasThread {
        @Override
        public void run() {
            while (!server.isStopped()) {
                FlushQueueEntry fqe = null;
                try {
                    wakeupPending.set(false); // allow someone to wake us up again
                    fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
                    if (fqe == null || fqe instanceof WakeupFlushThread) {//超时后如果没有 flush 请求,或者 flush 请求是 wakeup 请求,这里显然是后者
                        if (isAboveLowWaterMark()) {//如果超过 lowerlimit,则开始 flush
                            LOG.debug("Flush thread woke up because memory above low water="
                                    + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
                            if (!flushOneForGlobalPressure()) {//这里开始执行 flush,并返回执行结果。这里的 flush 操作是挑选一个最值得 flush region 执行。并不会 flush 整个 rs 的所有memstore,具体执行往后看
                                Thread.sleep(1000);
                                wakeUpIfBlocking();
                            }
                            // Enqueue another one of these tokens so we'll wake up again
                            wakeupFlushThread();//自我唤醒。因为每次执行只 flush 一个 region,如果你的 region 很多,可能 flush 其中一个并不能解除你的内存报警,需要自己唤醒,以再次检查是否需要继续 flush
                        }
                        continue;
                    }
                    FlushRegionEntry fre = (FlushRegionEntry) fqe;//如果不是被WakeupFlushThread唤醒的,那么就是特定的 region flush 请求,直接 flush 指定的 region
                    if (!flushRegion(fre)) {
                        break;
                    }
                } catch (InterruptedException ex) {
                    continue;
                } catch (ConcurrentModificationException ex) {
                    continue;
                } catch (Exception ex) {
                    LOG.error("Cache flusher failed for entry " + fqe, ex);
                    if (!server.checkFileSystem()) {
                        break;
                    }
                }
            }
            synchronized (regionsInQueue) {
                regionsInQueue.clear();
                flushQueue.clear();
            }

            // Signal anyone waiting, so they see the close flag
            wakeUpIfBlocking();
            LOG.info(getName() + " exiting");
        }
    }


flushOneForGlobalPressure会从所有的 memstore 中选出两个 region,一个是storefile 数量没有超过熟练限制(默认7个)的 region 中 memstore 最大的,假定为 A,一个是所有 region 中 memstore 最大的,假定为 B。如果B 的 memstore > 2*A.memstore,则 flushB,否则 flush A。
private boolean flushOneForGlobalPressure() {
        SortedMap<Long, HRegion> regionsBySize =
                server.getCopyOfOnlineRegionsSortedBySize();

        Set<HRegion> excludedRegions = new HashSet<HRegion>();

        boolean flushedOne = false;
        while (!flushedOne) {//一旦成功 flush 某个 region,就 say 88
            // Find the biggest region that doesn't have too many storefiles
            // (might be null!)
            HRegion bestFlushableRegion = getBiggestMemstoreRegion(
                    regionsBySize, excludedRegions, true);
            // Find the biggest region, total, even if it might have too many flushes.
            HRegion bestAnyRegion = getBiggestMemstoreRegion(
                    regionsBySize, excludedRegions, false);

            if (bestAnyRegion == null) {
                LOG.error("Above memory mark but there are no flushable regions!");
                return false;
            }

            HRegion regionToFlush;
            if (bestFlushableRegion != null &&
                    bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Under global heap pressure: " +
                            "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
                            "store files, but is " +
                            StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
                            " vs best flushable region's " +
                            StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
                            ". Choosing the bigger.");
                }
                regionToFlush = bestAnyRegion;
            } else {
                if (bestFlushableRegion == null) {
                    regionToFlush = bestAnyRegion;
                } else {
                    regionToFlush = bestFlushableRegion;
                }
            }

            Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);

            LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
            flushedOne = flushRegion(regionToFlush, true);
            if (!flushedOne) {
                LOG.info("Excluding unflushable region " + regionToFlush +
                        " - trying to find a different region to flush.");
                excludedRegions.add(regionToFlush);//如果当前的 flush 失败,则把该 region 加入排除列表
            }
        }
        return true;
    }
分享到:
评论

相关推荐

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hadoop2-compat-1.2.12-API文档-中文版.zip

    赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase hbck2修复工具hbase-operator-tools-1.0.0.1.0.0.0-618-bin.tar.gz

    hbase hbck2修复工具hbase-operator-tools-1.0.0.1.0.0.0-618-bin.tar.gz,hbase1版本的hbck已经不支持修复命令,hbase2.1版本需要用这个新版的工具

    hbase-hadoop2-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop2-compat-1.1.3-API文档-中英对照版.zip

    赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hbck2-1.2.0-SNAPSHOT.jar

    HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除; HBCK2已经被剥离出HBase成为了一个单独的项目,如果你想要使用这个工具,需要根据自己HBase的版本,编译源码。其GitHub地址...

    phoenix-core-4.7.0-HBase-1.1-API文档-中文版.zip

    赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...

    hbase-hadoop2-compat-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop2-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-hadoop2-compat-1.2.12-API文档-中英对照版.zip

    赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...

    hbase-0.98.7-hadoop2-bin.tar

    《深入理解HBase:以hbase-0.98.7-hadoop2-bin.tar为例》 HBase,作为Apache软件基金会的重要项目之一,是构建在Hadoop生态系统之上的一款分布式、高性能、列式存储的NoSQL数据库。它为大规模数据集提供了实时读写...

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    hbase-prefix-tree-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-prefix-tree-1.1.3.jar; 赠送原API文档:hbase-prefix-tree-1.1.3-javadoc.jar; 赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; ...

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...

    hbase-annotations-1.1.2-API文档-中英对照版.zip

    赠送jar包:hbase-annotations-1.1.2.jar; 赠送原API文档:hbase-annotations-1.1.2-javadoc.jar; 赠送源代码:hbase-annotations-1.1.2-sources.jar; 包含翻译后的API文档:hbase-annotations-1.1.2-javadoc-...

    hbase-client-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-client-1.4.3.jar; 赠送原API文档:hbase-client-1.4.3-javadoc.jar; 赠送源代码:hbase-client-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-client-1.4.3.pom; 包含翻译后的API文档:...

    hbase-hadoop-compat-1.1.3-API文档-中文版.zip

    赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....

    flink-hbase-2.11-1.10.0-API文档-中文版.zip

    赠送jar包:flink-hbase_2.11-1.10.0.jar; 赠送原API文档:flink-hbase_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-hbase_2.11-...人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用。

    hbase-metrics-api-1.4.3-API文档-中文版.zip

    赠送jar包:hbase-metrics-api-1.4.3.jar; 赠送原API文档:hbase-metrics-api-1.4.3-javadoc.jar; 赠送源代码:hbase-metrics-api-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-metrics-api-1.4.3.pom; ...

    phoenix-hbase-2.2-5.1.2-bin.tar.gz

    总的来说,`phoenix-hbase-2.2-5.1.2-bin.tar.gz`是大数据存储和分析领域的重要工具,它结合了HBase的分布式存储能力和Phoenix的SQL查询能力,为大数据处理提供了一种高效、易用的解决方案。无论是对大数据新手还是...

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

Global site tag (gtag.js) - Google Analytics