整体思路说明:
对于并发限制请求,会统计当前的并发数,并发数统计原理:1次请求进入到限流模块incr 1;等请求结束退出时decr 1,当前正在处理的请求数就是并发数
对于QPS限制请求,统计QPS不能按照秒统计(第1s系统就可能就被打挂了),所以QPS得按照毫秒级别去统计,统计的级别越小,性能损耗越大,所以定在10ms-100ms的级别去统计,基本逻辑如下将1s中切成10份,每一份100ms,一个请求进来肯定会落在某一份上,这一份的计数值++,计算当前的QPS,只需要将当前时间所在份的计数和前面9份的技数相加;内存里面需要维护当前秒和前面2秒的数据,数据结构以环形数组为基础EndFragment
简单实例:
初始化上下文(第一次进入的时候才有效,后续都已经有了返回)
ContextUtil.enter("xxxxxx", this.getRequestPlatForm());
初始化
Entry entry = EntryUtil.entry("xxxx);
获取对应的value,然后根据调用链一次执行,修改计数。
@Override
public Entry entry(ResourceWrapper resourceWrapper) throws BlockException {
Context context = ContextUtil.getContext();
if(ContextUtil.isNullContext(context)) {
//空 不需要判断
return new CtEntry(null , context);
}
if(context == null) {
//创建默认
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", this);
}
if(!Constants.ON) {
return new CtEntry(null, context); // 开关不生效
}
IProcessorValve valves = getProcessorValves(resourceWrapper);
if(valves == null) {
return new CtEntry(null, context);
}
Entry entry = new CtEntry(valves, context);
try{
//规则判断,计数修改
valves.entry(resourceWrapper, context, null);
}catch(BlockException e){
//阻塞的总计数修改
Tracer.trace(BlockException.BLOCK);
entry.exit();
throw e;
}catch(Throwable e) {
//理论上不会有这个异常
log.error("unknow exception", e);
}
return entry;
}
最后一步执行统计流程:
@Override
public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {
// 设置originDom
if (!context.getOrigin().equals("")) {
Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());
context.getCurEntry().setOriginDom(originDom);
}
try {
// 先执行其他的Valve
super.fireEntry(resourceWrapper, context, dom);
dom.increasePassRequest();
dom.increaseThreadNum();
dom.add();//增加对应的统计数据
// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increasePassRequest();
context.getCurEntry().getOriginDom().increaseThreadNum();
context.getCurEntry().getOriginDom().add();
}
} catch (Throwable ex) {
if (ex instanceof BlockException) {
dom.increaseBlockedRequest();
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increaseBlockedRequest();
}
}
context.getCurEntry().setError(ex);
throw ex;
}
}
数据增加,最终算法:
public void add(int value) {
if (value > Constants.MAX_TIME_VALUE){
value = Constants.MAX_TIME_VALUE;
}
//如 xxx1111毫秒
long currentTime = TickUtil.currentTimeMillis();
//通过进度计算,对应到具体的精度值。如精度为100ms,xxx11为进度值
long timeGranularity = currentTime / precision;
//根据具体的时间节点值,映射到具体的时间环分片。30个时间分片(保留3s的数据),对应到的分片为11分片。
int index = (int) (timeGranularity % time.length);
do{
int recordPassCnt = passCnt[index].get();
int recordBlockCnt = blockCnt[index].get();
int recordRt = rt[index].get();
long recordTime = time[index].get();
if ( timeGranularity == recordTime ){
//对应分片的统计数据增加
if (value < 0){
if (blockCnt[index].compareAndSet(recordBlockCnt,recordBlockCnt + 1)){
break;
}
}else{
boolean result = rt[index].compareAndSet(recordRt, recordRt + value);
result = result && passCnt[index].compareAndSet(recordPassCnt, recordPassCnt + 1);
if (result || time[index].get() != timeGranularity){
break;
}
}
} else if(timeGranularity > recordTime){//如果超过时间环一圈,如,41对应的分片也为11,需要先清空分片数据,然后再重新统计。
synchronized (time[index]) {
if (timeGranularity > time[index].get()) {
time[index].set(-1);
passCnt[index].set(-1);
blockCnt[index].set(-1);
rt[index].set(-1);
time[index].set(timeGranularity);
if (value < 0) {
passCnt[index].addAndGet(1);
blockCnt[index].addAndGet(2);
rt[index].addAndGet(1);
} else {
passCnt[index].addAndGet(2);
blockCnt[index].addAndGet(1);
rt[index].addAndGet(1 + value);
}
break;
}
}
}else {
break;
}
Thread.yield();
}while(true);
}
数据统计算法:
@Override
public int[] getAvgQpsAndRt() {
long currentTime = TickUtil.currentTimeMillis();
long endTimeGranularity = currentTime / precision;
int index = (int) (endTimeGranularity % time.length);
long startTimeGranularity = endTimeGranularity - sampleCnt;
long totalPassCnt = 0;
long totalBlockCnt = 0;
long totalRt = 0;
//向前统计N个时间片的数据,比如可以统计1s的数据,精度为100ms,则samplCnt值为10,共取10个分片数据,计算qps和rt
for (int i = 0; i < sampleCnt; i++){
long recordTime = time[index].get();
if (recordTime <= endTimeGranularity && recordTime > startTimeGranularity){
int recordPass = passCnt[index].get();
int recordBlock = blockCnt[index].get();
int recordRt = rt[index].get();
if (recordTime == time[index].get()) {
totalPassCnt += recordPass;
totalBlockCnt += recordBlock;
totalRt += recordRt;
}else{
startTimeGranularity = recordTime - 1;
break;
}
} else if (recordTime > endTimeGranularity){
startTimeGranularity = recordTime - 1;
break;
}
index = (index -1 + time.length) % time.length;
}
long duration = precision * sampleCnt;
int[] avgResult = new int[]{0,0,0}; //passCnt, blockCnt, rt
if (duration != 0){
avgResult[0] = (int) (totalPassCnt * 1000 / duration);
avgResult[1] = (int) (totalBlockCnt * 1000 / duration);
avgResult[2] = (int) Math.ceil((double)totalRt / (totalPassCnt == 0 ? 1 : totalPassCnt));
}
return avgResult;
数据限流算法:
执行最终的计数算法之前,首先要执行规则检查:
@Override
public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {
// 设置originDom
if (!context.getOrigin().equals("")) {
Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());
context.getCurEntry().setOriginDom(originDom);
}
try {
// 先执行其他的Valve,限制性:qps、并发限流等规则检查(详见下:)
super.fireEntry(resourceWrapper, context, dom);
dom.increasePassRequest();
dom.increaseThreadNum();
dom.add();
// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increasePassRequest();
context.getCurEntry().getOriginDom().increaseThreadNum();
context.getCurEntry().getOriginDom().add();
}
} catch (Throwable ex) {
if (ex instanceof BlockException) {
dom.increaseBlockedRequest();
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increaseBlockedRequest();
}
}
context.getCurEntry().setError(ex);
throw ex;
}
}
//该方法为对应的检查规则的方法
@Override
public boolean checkRule(Context context, DefaultDom dom) {
// 获取此规则的受限应用(可能是来源,本身,或者去向)
String limitApp = this.getLimitApp();
// 如果此规则无限制应用,立即通过
if (limitApp == null) {
return true;
}
// 统计值
long count = -1L;
// 来源限流
String origin = context.getOrigin();
// 按照流向计算
switch (type) {
case FlowConstant.TYPE_RESOURCE:
count = getResourceCount(dom);
break;// 此资源本身的总限流
case FlowConstant.TYPE_ORIGIN:
count = getOriginCount(limitApp, origin, context, dom);
break;// 此资源来源的限流
case FlowConstant.TYPE_DESTINATION:
count = getDestinationCount(limitApp, context, dom);// 此资源去向的限流
}
// 如果当前的值已经=或>阀值,则return false
if (count >= threshold) {
return false;
}
return true;
}
//检查是否超过了qps限流值
private long getOriginCount(final String limitApp, final String origin, final Context context, final DefaultDom dom) {
long count = -1L;
if (limitApp.equals(origin)) {// limitApp与来源相同的限流
count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();
} else if (limitApp.equals(FlowConstant.APP_DEFAULT)) {// 所有的来源,即资源的总限流(也可以设置type字段为resource的类型获得相同结果)
count = strategy == FlowConstant.STRATEGY_QPS ? dom.getResourceDom().passReqQps() : dom.getResourceDom().curThreadNum();
} else if (limitApp.equals(FlowConstant.APP_OTHER) && FlowManager.isOtherOrigin(getIdentity(), origin)) {// 其他的来源限流
count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();
}
return count;
}
相关推荐
【基于Redis实现分布式应用限流的方法】 限流是保护系统免受高并发访问或恶意攻击的重要手段,通过限制系统的处理速度或在特定时间窗口内处理的请求数量,防止系统资源耗尽导致服务崩溃。Redis,作为一款高效且广泛...
文件"应用于降压变换器的限流保护电路及限流保护方法.pdf"可能详细阐述了上述理论,并可能提供了一些具体的电路设计实例、参数计算方法以及实际应用中的优化技巧。阅读这份资料将有助于深入理解降压变换器的限流保护...
限流是一种控制服务处理速率的技术,目的是限制系统的输入或输出速率,以防止过大的流量导致服务器过载,从而维护系统稳定性和可用性。它通常应用于API接口、数据库访问、消息队列消费等场景。 Redis 是一个高性能...
在Java编程中,限流是一种重要的技术手段,用于控制系统的吞吐量,防止过多的请求导致服务过载或崩溃。本文将深入探讨如何利用Java实现限流,并关注于使用Semaphore作为令牌桶和漏桶算法的实现,以及在数据线程数量...
### 限流电阻速算与应用 在电子设备设计中,限流电阻的应用十分广泛,尤其是在涉及发光二极管(LED)的电路中。本文将详细介绍如何根据不同的电路配置(串联或并联)来计算所需的限流电阻值,并提供一些实际应用...
电子政务文档《一种用于电脑的限流保护调节电路》可能详细阐述了上述概念,并提供了具体的设计方案和实例。该文档可能涵盖了电路图、元件选择、控制策略以及实际应用案例等内容,对于电子工程师来说,是一份宝贵的...
然后,在各个服务节点上,通过Guava的RateLimiter实例与Zookeeper交互,获取最新的限流配置。当新的请求到来时,服务节点会通过RateLimiter检查当前是否可以处理该请求,如果超过限制则阻塞或返回错误。 关于"动态...
- **实例**:假设我们要点亮一个LED灯,已知其额定电流为20mA,而电源电压为5V,LED的正向电压降为2V,则可以通过计算得出需要串联一个150Ω的限流电阻,以确保流经LED的电流不超过20mA。 2. **分压电阻**: - **...
而"行业分类-电子政务-一种抑制电力系统短路故障的限流装置.pdf"这份文档,很可能是深入探讨这一主题的专业资料,包括最新的研究成果、技术应用实例和可能的未来发展趋势。对于从事电力行业的专业人士而言,阅读此类...
在IT行业中,限流是一种常见的系统保护策略,用于限制在特定时间内系统处理的请求速率,以防止过载或资源耗尽。在这个场景中,我们关注的是一个基于Python和Redis实现的限流器。Redis是一个高性能的键值数据库,常被...
在IT行业中,限流是一种非常重要的技术,用于保护系统免受过高的请求负载,防止服务崩溃。本篇文章将深入探讨Go语言中基于Token Bucket算法实现的RateLimiter模块,这是一款用于API接口限流的工具。 首先,让我们...
此时,每个LED的电流仍然是独立的,所以每个LED的限流电阻的计算方法与单个LED相同。例如,如果有两个LED并联,每个LED都需要一个500欧姆的限流电阻。 ### 四、注意事项 1. **电流分布**:在实际应用中,由于制造...
限流是分布式系统中的重要组成部分,而Redis提供了一种高效的方式来实现简单的限流策略。通过合理地使用zset和时间窗口,我们能够有效地控制用户的请求频率,保护系统不被过多的请求压垮。文章通过一个具体的例子,...
在 Sentinel 中,集群限流是一种解决多实例应用中全局流量控制的策略,以确保整体服务的稳定性和性能。本文将深入探讨 Sentinel 集群限流的概念、原理以及部署方式。 集群限流的目标是克服单机限流的局限性,当多个...
"按并发阈值限流执行的异步函数"是一种策略,用于优化系统资源使用,防止过载,并确保系统的稳定性和响应性。这种策略在处理大量并发请求时尤其有用,例如在爬虫、API调用或者数据处理等场景。 首先,我们需要理解...
带限流的恒流电路是一种在电源设计中广泛应用的技术,它旨在确保设备在各种负载条件下都能得到恒定的电流供应,同时限制过大的电流以防止损坏硬件。本文将深入探讨这种电路的工作原理、设计方法以及在电子政务系统中...
标题中的“行业资料-电子功用-具有限流功率输出的集成电路和有关方法的说明分析”表明,这份资料主要探讨的是电子工程领域中的一个重要主题——具有限流功率输出的集成电路(Integrated Circuit, IC)的设计和应用。...
限流保护电路设计是电子工程领域中的一个重要主题...在"资料-限流保护电路设计.pdf"中,可能会涵盖这些内容的详细技术分析、实例电路图、设计计算以及实际应用案例,对于理解和设计限流保护电路来说是非常宝贵的资源。
为应对这一问题,科研人员提出了一种故障限流器优化配置方法,其目的是在保证电网可靠性的同时,合理控制短路电流的影响,减少由此造成的负荷损失。本文将详细探讨这一方法的理论依据、实现方式以及实际应用效果。 ...