`
yuren1hao
  • 浏览: 2324 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

一种应用限流方法实例

阅读更多

整体思路说明:

    对于并发限制请求,会统计当前的并发数,并发数统计原理:1次请求进入到限流模块incr 1;等请求结束退出时decr 1,当前正在处理的请求数就是并发数

 

对于QPS限制请求,统计QPS不能按照秒统计(第1s系统就可能就被打挂了),所以QPS得按照毫秒级别去统计,统计的级别越小,性能损耗越大,所以定在10ms-100ms的级别去统计,基本逻辑如下1s中切成10份,每一份100ms,一个请求进来肯定会落在某一份上,这一份的计数值++,计算当前的QPS,只需要将当前时间所在份的计数和前面9份的技数相加;内存里面需要维护当前秒和前面2秒的数据,数据结构以环形数组为基础EndFragment

 

简单实例:

 

初始化上下文(第一次进入的时候才有效,后续都已经有了返回)

ContextUtil.enter("xxxxxx", this.getRequestPlatForm());

初始化

Entry entry = EntryUtil.entry("xxxx);

获取对应的value,然后根据调用链一次执行,修改计数。

    @Override

    public Entry entry(ResourceWrapper resourceWrapper) throws BlockException {

        Context context = ContextUtil.getContext();

        if(ContextUtil.isNullContext(context)) {

            //空 不需要判断

            return new CtEntry(null , context);

        }

        if(context == null) {

            //创建默认

            context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", this);

        }

        if(!Constants.ON) {

            return new CtEntry(null, context); // 开关不生效

        }

 

        IProcessorValve valves = getProcessorValves(resourceWrapper);

        if(valves == null) {

            return new CtEntry(null, context);

        }

        Entry entry = new CtEntry(valves, context);

        try{

            //规则判断,计数修改

            valves.entry(resourceWrapper, context, null);

        }catch(BlockException e){

            //阻塞的总计数修改

            Tracer.trace(BlockException.BLOCK);

            entry.exit();

            throw e;

        }catch(Throwable e) {

            //理论上不会有这个异常

            log.error("unknow exception", e);

        }

 

        return entry;

    }

最后一步执行统计流程:

@Override

public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {

// 设置originDom

if (!context.getOrigin().equals("")) {

Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());

context.getCurEntry().setOriginDom(originDom);

}

try {

// 先执行其他的Valve

super.fireEntry(resourceWrapper, context, dom);

dom.increasePassRequest();

dom.increaseThreadNum();

dom.add();//增加对应的统计数据

// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increasePassRequest();

context.getCurEntry().getOriginDom().increaseThreadNum();

context.getCurEntry().getOriginDom().add();

}

} catch (Throwable ex) {

if (ex instanceof BlockException) {

dom.increaseBlockedRequest();

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increaseBlockedRequest();

}

}

context.getCurEntry().setError(ex);

throw ex;

}

}

 

 

数据增加,最终算法:

 public void add(int value) {

 

        if (value > Constants.MAX_TIME_VALUE){

            value = Constants.MAX_TIME_VALUE;

        }

        //如 xxx1111毫秒

        long currentTime = TickUtil.currentTimeMillis();

        //通过进度计算,对应到具体的精度值。如精度为100ms,xxx11为进度值

        long timeGranularity = currentTime / precision;

        //根据具体的时间节点值,映射到具体的时间环分片。30个时间分片(保留3s的数据),对应到的分片为11分片。

        int index = (int) (timeGranularity % time.length);

 

        do{

            int recordPassCnt = passCnt[index].get();

            int recordBlockCnt = blockCnt[index].get();

            int recordRt = rt[index].get();

            long recordTime = time[index].get();

 

             if ( timeGranularity == recordTime ){

             //对应分片的统计数据增加

                if (value < 0){

                    if (blockCnt[index].compareAndSet(recordBlockCnt,recordBlockCnt + 1)){

                        break;

                    }

                }else{

                    boolean result = rt[index].compareAndSet(recordRt, recordRt +  value);

                    result = result && passCnt[index].compareAndSet(recordPassCnt, recordPassCnt + 1);

                    if (result || time[index].get() != timeGranularity){

                        break;

                    }

                }

            } else if(timeGranularity > recordTime){//如果超过时间环一圈,如,41对应的分片也为11,需要先清空分片数据,然后再重新统计。

                synchronized (time[index]) {

                    if (timeGranularity > time[index].get()) {

                        time[index].set(-1);

                        passCnt[index].set(-1);

                        blockCnt[index].set(-1);

                        rt[index].set(-1);

 

                        time[index].set(timeGranularity);

 

                        if (value < 0) {

                            passCnt[index].addAndGet(1);

                            blockCnt[index].addAndGet(2);

                            rt[index].addAndGet(1);

                        } else {

                            passCnt[index].addAndGet(2);

                            blockCnt[index].addAndGet(1);

                            rt[index].addAndGet(1 + value);

                        }

                        break;

                    }

                }

            }else {

                 break;

            }

            Thread.yield();

        }while(true);

    }

 

 

数据统计算法:

 

    @Override

    public int[] getAvgQpsAndRt() {

        long currentTime = TickUtil.currentTimeMillis();

        long endTimeGranularity = currentTime / precision;

        int index = (int) (endTimeGranularity % time.length);

        long startTimeGranularity = endTimeGranularity - sampleCnt;

        long totalPassCnt = 0;

        long totalBlockCnt = 0;

        long totalRt = 0;

        //向前统计N个时间片的数据,比如可以统计1s的数据,精度为100ms,则samplCnt值为10,共取10个分片数据,计算qps和rt

        for (int i = 0; i < sampleCnt; i++){

            long recordTime = time[index].get();

            if (recordTime <= endTimeGranularity && recordTime > startTimeGranularity){

                int recordPass =  passCnt[index].get();

                int recordBlock = blockCnt[index].get();

                int recordRt = rt[index].get();

                if (recordTime == time[index].get()) {

                    totalPassCnt += recordPass;

                    totalBlockCnt += recordBlock;

                    totalRt += recordRt;

                }else{

                    startTimeGranularity = recordTime - 1;

                    break;

                }

            } else if (recordTime > endTimeGranularity){

                startTimeGranularity = recordTime - 1;

                break;

            }

            index = (index -1 + time.length) % time.length;

        }

        long duration = precision * sampleCnt;

        int[] avgResult = new int[]{0,0,0}; //passCnt, blockCnt, rt

        if (duration != 0){

            avgResult[0] = (int) (totalPassCnt * 1000 / duration);

            avgResult[1] = (int) (totalBlockCnt * 1000 / duration);

            avgResult[2] = (int) Math.ceil((double)totalRt / (totalPassCnt == 0 ? 1 : totalPassCnt));

        }

        return avgResult;

 

数据限流算法:

 

执行最终的计数算法之前,首先要执行规则检查:

    @Override

public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {

// 设置originDom

if (!context.getOrigin().equals("")) {

Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());

context.getCurEntry().setOriginDom(originDom);

}

try {

// 先执行其他的Valve,限制性:qps、并发限流等规则检查(详见下:)

super.fireEntry(resourceWrapper, context, dom);

dom.increasePassRequest();

dom.increaseThreadNum();

dom.add();

// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increasePassRequest();

context.getCurEntry().getOriginDom().increaseThreadNum();

context.getCurEntry().getOriginDom().add();

}

} catch (Throwable ex) {

if (ex instanceof BlockException) {

dom.increaseBlockedRequest();

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increaseBlockedRequest();

}

}

context.getCurEntry().setError(ex);

throw ex;

}

}

 

//该方法为对应的检查规则的方法

@Override

    public boolean checkRule(Context context, DefaultDom dom) {

        // 获取此规则的受限应用(可能是来源,本身,或者去向)

        String limitApp = this.getLimitApp();

        // 如果此规则无限制应用,立即通过

        if (limitApp == null) {

            return true;

        }

        // 统计值

        long count = -1L;

        // 来源限流

        String origin = context.getOrigin();

        // 按照流向计算

        switch (type) {

            case FlowConstant.TYPE_RESOURCE:

                count = getResourceCount(dom);

                break;// 此资源本身的总限流

            case FlowConstant.TYPE_ORIGIN:

                count = getOriginCount(limitApp, origin, context, dom);

                break;// 此资源来源的限流

            case FlowConstant.TYPE_DESTINATION:

                count = getDestinationCount(limitApp, context, dom);// 此资源去向的限流

        }

        // 如果当前的值已经=或>阀值,则return false

        if (count >= threshold) {

            return false;

        }

        return true;

    }

 

 

//检查是否超过了qps限流值

private long getOriginCount(final String limitApp, final String origin, final Context context, final DefaultDom dom) {

        long count = -1L;

        if (limitApp.equals(origin)) {// limitApp与来源相同的限流

            count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();

        } else if (limitApp.equals(FlowConstant.APP_DEFAULT)) {// 所有的来源,即资源的总限流(也可以设置type字段为resource的类型获得相同结果)

            count = strategy == FlowConstant.STRATEGY_QPS ? dom.getResourceDom().passReqQps() : dom.getResourceDom().curThreadNum();

        } else if (limitApp.equals(FlowConstant.APP_OTHER) && FlowManager.isOtherOrigin(getIdentity(), origin)) {// 其他的来源限流

            count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();

        }

        return count;

    }

分享到:
评论

相关推荐

    基于Redis实现分布式应用限流的方法

    【基于Redis实现分布式应用限流的方法】 限流是保护系统免受高并发访问或恶意攻击的重要手段,通过限制系统的处理速度或在特定时间窗口内处理的请求数量,防止系统资源耗尽导致服务崩溃。Redis,作为一款高效且广泛...

    电子功用-应用于降压变换器的限流保护电路及限流保护方法

    文件"应用于降压变换器的限流保护电路及限流保护方法.pdf"可能详细阐述了上述理论,并可能提供了一些具体的电路设计实例、参数计算方法以及实际应用中的优化技巧。阅读这份资料将有助于深入理解降压变换器的限流保护...

    基于redis限流系统

    限流是一种控制服务处理速率的技术,目的是限制系统的输入或输出速率,以防止过大的流量导致服务器过载,从而维护系统稳定性和可用性。它通常应用于API接口、数据库访问、消息队列消费等场景。 Redis 是一个高性能...

    java实现限流,封装的工具类

    在Java编程中,限流是一种重要的技术手段,用于控制系统的吞吐量,防止过多的请求导致服务过载或崩溃。本文将深入探讨如何利用Java实现限流,并关注于使用Semaphore作为令牌桶和漏桶算法的实现,以及在数据线程数量...

    限流電阻速算

    ### 限流电阻速算与应用 在电子设备设计中,限流电阻的应用十分广泛,尤其是在涉及发光二极管(LED)的电路中。本文将详细介绍如何根据不同的电路配置(串联或并联)来计算所需的限流电阻值,并提供一些实际应用...

    电子-一种用于电脑的限流保护调节电路

    电子政务文档《一种用于电脑的限流保护调节电路》可能详细阐述了上述概念,并提供了具体的设计方案和实例。该文档可能涵盖了电路图、元件选择、控制策略以及实际应用案例等内容,对于电子工程师来说,是一份宝贵的...

    基于Zookeeper和guava动态限流 源码

    然后,在各个服务节点上,通过Guava的RateLimiter实例与Zookeeper交互,获取最新的限流配置。当新的请求到来时,服务节点会通过RateLimiter检查当前是否可以处理该请求,如果超过限制则阻塞或返回错误。 关于"动态...

    限流电阻和分压电阻的区别

    - **实例**:假设我们要点亮一个LED灯,已知其额定电流为20mA,而电源电压为5V,LED的正向电压降为2V,则可以通过计算得出需要串联一个150Ω的限流电阻,以确保流经LED的电流不超过20mA。 2. **分压电阻**: - **...

    电子-一种抑制电力系统短路故障的限流装置

    而"行业分类-电子政务-一种抑制电力系统短路故障的限流装置.pdf"这份文档,很可能是深入探讨这一主题的专业资料,包括最新的研究成果、技术应用实例和可能的未来发展趋势。对于从事电力行业的专业人士而言,阅读此类...

    python基于redis的限流器.zip

    在IT行业中,限流是一种常见的系统保护策略,用于限制在特定时间内系统处理的请求速率,以防止过载或资源耗尽。在这个场景中,我们关注的是一个基于Python和Redis实现的限流器。Redis是一个高性能的键值数据库,常被...

    Go-RateLimiter基于TokenBucket算法实现的api限流模块

    在IT行业中,限流是一种非常重要的技术,用于保护系统免受过高的请求负载,防止服务崩溃。本篇文章将深入探讨Go语言中基于Token Bucket算法实现的RateLimiter模块,这是一款用于API接口限流的工具。 首先,让我们...

    12伏特下led并联所用限流电阻大小计算公式.pdf

    此时,每个LED的电流仍然是独立的,所以每个LED的限流电阻的计算方法与单个LED相同。例如,如果有两个LED并联,每个LED都需要一个500欧姆的限流电阻。 ### 四、注意事项 1. **电流分布**:在实际应用中,由于制造...

    7应用 6:断尾求生 —— 简单限流(3).md

    限流是分布式系统中的重要组成部分,而Redis提供了一种高效的方式来实现简单的限流策略。通过合理地使用zset和时间窗口,我们能够有效地控制用户的请求频率,保护系统不被过多的请求压垮。文章通过一个具体的例子,...

    Sentinel 实战-集群限流1

    在 Sentinel 中,集群限流是一种解决多实例应用中全局流量控制的策略,以确保整体服务的稳定性和性能。本文将深入探讨 Sentinel 集群限流的概念、原理以及部署方式。 集群限流的目标是克服单机限流的局限性,当多个...

    Node.js-用于处理按并发阈值限流执行的异步函数

    "按并发阈值限流执行的异步函数"是一种策略,用于优化系统资源使用,防止过载,并确保系统的稳定性和响应性。这种策略在处理大量并发请求时尤其有用,例如在爬虫、API调用或者数据处理等场景。 首先,我们需要理解...

    电子政务-带限流的恒流电路.zip

    带限流的恒流电路是一种在电源设计中广泛应用的技术,它旨在确保设备在各种负载条件下都能得到恒定的电流供应,同时限制过大的电流以防止损坏硬件。本文将深入探讨这种电路的工作原理、设计方法以及在电子政务系统中...

    行业资料-电子功用-具有限流功率输出的集成电路和有关方法的说明分析.rar

    标题中的“行业资料-电子功用-具有限流功率输出的集成电路和有关方法的说明分析”表明,这份资料主要探讨的是电子工程领域中的一个重要主题——具有限流功率输出的集成电路(Integrated Circuit, IC)的设计和应用。...

    资料-限流保护电路设计.zip

    限流保护电路设计是电子工程领域中的一个重要主题...在"资料-限流保护电路设计.pdf"中,可能会涵盖这些内容的详细技术分析、实例电路图、设计计算以及实际应用案例,对于理解和设计限流保护电路来说是非常宝贵的资源。

    大电网可靠性中的故障限流器优化配置研究.docx

    为应对这一问题,科研人员提出了一种故障限流器优化配置方法,其目的是在保证电网可靠性的同时,合理控制短路电流的影响,减少由此造成的负荷损失。本文将详细探讨这一方法的理论依据、实现方式以及实际应用效果。 ...

Global site tag (gtag.js) - Google Analytics