public class TrafficShapingFilter extends IoFilterAdapter {
protected static Logger log = LoggerFactory.getLogger(TrafficShapingFilter.class);
private final AttributeKey STATE = new AttributeKey(getClass(), "state");
//定时器服务,用来创建定时任务的定时器服务类
private final ScheduledExecutorService scheduledExecutor;
//用来估算对象占据JVM内存的大小的类,计算出一个对象实际占用JVM内存的大小
private final MessageSizeEstimator messageSizeEstimator;
private volatile int maxReadThroughput;
private volatile int maxWriteThroughput;
private volatile int poolSize = 1;
/**
* 构造方法
* @param maxReadThroughput 最大读取字节大小(单位:秒)
* @param maxWriteThroughput 最大写出的字节大小(单位:秒)
*/
public TrafficShapingFilter(int maxReadThroughput, int maxWriteThroughput) {
this(null, null, maxReadThroughput, maxWriteThroughput);
}
public TrafficShapingFilter(ScheduledExecutorService scheduledExecutor, int maxReadThroughput,
int maxWriteThroughput) {
this(scheduledExecutor, null, maxReadThroughput, maxWriteThroughput);
}
public TrafficShapingFilter(ScheduledExecutorService scheduledExecutor, MessageSizeEstimator messageSizeEstimator,
int maxReadThroughput, int maxWriteThroughput) {
log.debug("ctor - executor: {} estimator: {} max read: {} max write: {}", new Object[] { scheduledExecutor,
messageSizeEstimator, maxReadThroughput, maxWriteThroughput });
if (scheduledExecutor == null) {
//实例化一个定时器对象,线程池的默认数量是1.
scheduledExecutor = new ScheduledThreadPoolExecutor(poolSize);
//throw new NullPointerException("scheduledExecutor");
}
if (messageSizeEstimator == null) {
//实例化默认的估算消息大小的类
messageSizeEstimator = new DefaultMessageSizeEstimator() {
@Override
public int estimateSize(Object message) {
if (message instanceof IoBuffer) {
return ((IoBuffer) message).remaining();
}
return super.estimateSize(message);
}
};
}
this.scheduledExecutor = scheduledExecutor;
this.messageSizeEstimator = messageSizeEstimator;
//设置最大读取字节长度(单位:秒)
setMaxReadThroughput(maxReadThroughput);
//设置最大写出字节长度(单位:秒)
setMaxWriteThroughput(maxWriteThroughput);
}
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}
public MessageSizeEstimator getMessageSizeEstimator() {
return messageSizeEstimator;
}
public int getMaxReadThroughput() {
return maxReadThroughput;
}
public void setMaxReadThroughput(int maxReadThroughput) {
if (maxReadThroughput < 0) {
maxReadThroughput = 0;
}
this.maxReadThroughput = maxReadThroughput;
}
public int getMaxWriteThroughput() {
return maxWriteThroughput;
}
public void setMaxWriteThroughput(int maxWriteThroughput) {
if (maxWriteThroughput < 0) {
maxWriteThroughput = 0;
}
this.maxWriteThroughput = maxWriteThroughput;
}
public int getPoolSize() {
return poolSize;
}
public void setPoolSize(int poolSize) {
if (poolSize < 1) {
poolSize = 1;
}
this.poolSize = poolSize;
}
@Override
public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
if (parent.contains(this)) {
throw new IllegalArgumentException(
"You can't add the same filter instance more than once. Create another instance and add it.");
}
//给每一个session添加一个属性 STATE 属性,关联一个State对象。
parent.getSession().setAttribute(STATE, new State());
//调节会话sessiion读取buffer大小
adjustReadBufferSize(parent.getSession());
}
@Override
public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
//写完毕,关闭会话移除关联对象State
parent.getSession().removeAttribute(STATE);
}
@Override
public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) throws Exception {
int maxReadThroughput = this.maxReadThroughput;
//process the request if our max is greater than zero
if (maxReadThroughput > 0) {
final State state = (State) session.getAttribute(STATE);
long currentTime = System.currentTimeMillis();
long suspendTime = 0;
boolean firstRead = false;
synchronized (state) {
//估算当前已经读取字节数组总量
state.readBytes += messageSizeEstimator.estimateSize(message);
//如果读取回话挂起,suspendedRead=true
if (!state.suspendedRead) {
if (state.readStartTime == 0) {//表示第一次读取会话的数据
firstRead = true;
//设置会话开始读取数据的时间
state.readStartTime = currentTime - 1000;
}
//估算当前平均每秒读取字节流量大小
long throughput = (state.readBytes * 1000 / (currentTime - state.readStartTime));
if (throughput >= maxReadThroughput) {//如果平均读取流量大于设置的数值,
//计算需要挂起的时间,((state.readBytes/maxReadThroughput)*1000)计算如果按照规定流量读取数据需要多少秒,然后再减去已经读取的时间差,计算出需要挂起的时间
suspendTime = Math.max(0, (state.readBytes * 1000 / maxReadThroughput)
- (firstRead ? 0 : currentTime - state.readStartTime));
state.readBytes = 0;
state.readStartTime = 0;
state.suspendedRead = suspendTime != 0;
//调整会话状态,设置会话不在读取数据,利用定时器,挂起会话,
adjustReadBufferSize(session);
}
}
}
if (suspendTime != 0) {
session.suspendRead();
scheduledExecutor.schedule(new Runnable() {
public void run() {
synchronized (state) {
state.suspendedRead = false;
}
session.resumeRead();
}
}, suspendTime, TimeUnit.MILLISECONDS);
}
}
nextFilter.messageReceived(session, message);
}
/**
*
* 调整session默认设置最大读取字节数组长度
* @param session
*/
private void adjustReadBufferSize(IoSession session) {
int maxReadThroughput = this.maxReadThroughput;
if (maxReadThroughput == 0) {//如果不限制读取流量返回
return;
}
IoSessionConfig config = session.getConfig();
if (config.getReadBufferSize() > maxReadThroughput) {
config.setReadBufferSize(maxReadThroughput);
}
if (config.getMaxReadBufferSize() > maxReadThroughput) {
config.setMaxReadBufferSize(maxReadThroughput);
}
}
@Override
public void messageSent(NextFilter nextFilter, final IoSession session, WriteRequest writeRequest) throws Exception {
//得到当前系统设置最大写出字节
int maxWriteThroughput = this.maxWriteThroughput;
//process the request if our max is greater than zero
if (maxWriteThroughput > 0) {
final State state = (State) session.getAttribute(STATE);
//得到系统时间
long currentTime = System.currentTimeMillis();
//挂起时间长度0
long suspendTime = 0;
boolean firstWrite = false;
synchronized (state) {
state.writtenBytes += messageSizeEstimator.estimateSize(writeRequest.getMessage());
if (!state.suspendedWrite) {
if (state.writeStartTime == 0) {
firstWrite = true;
//初始化写时间
state.writeStartTime = currentTime - 1000;
}
//计算平均写字节数组流量
long throughput = (state.writtenBytes * 1000 / (currentTime - state.writeStartTime));
if (throughput >= maxWriteThroughput) {//写流量超出系统设置,会话挂起操作
//计算会话需要挂起的时间
suspendTime = Math.max(0, state.writtenBytes * 1000 / maxWriteThroughput
- (firstWrite ? 0 : currentTime - state.writeStartTime));
state.writtenBytes = 0;
state.writeStartTime = 0;
state.suspendedWrite = suspendTime != 0;
}
}
}
if (suspendTime != 0) {
log.trace("Suspending write");
//挂起会话
session.suspendWrite();
//定时器执行定时挂起操作
scheduledExecutor.schedule(new Runnable() {
public void run() {
synchronized (state) {
state.suspendedWrite = false;
}
//挂起唤醒会话
session.resumeWrite();
log.trace("Resuming write");
}
}, suspendTime, TimeUnit.MILLISECONDS);
}
}
//串模式,执行下一个过滤器
nextFilter.messageSent(session, writeRequest);
}
/**
* 状态标志
* @author Administrator
*
*/
private static class State {
/**
* 开始读取数据的时间
*/
private long readStartTime;
/**
* 开始写数据的时间
*/
private long writeStartTime;
/**
* 读数据是否已经被挂起,true:挂起,false:未挂起
*/
private boolean suspendedRead;
/**
* 是否写操作被挂起,true:被挂起,false:未被挂起
*/
private boolean suspendedWrite;
/**
* 该会话总共被读取数据字节长度
*/
private long readBytes;
/**
* 总共被写的数据的长度
*/
private long writtenBytes;
}
}
分享到:
相关推荐
Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)
在Mina2.0的示例代码中,客户端和服务器端通常包括以下步骤: 1. **配置服务端**:创建Acceptor,设置监听端口,添加必要的过滤器和处理器。 2. **启动服务端**:启动Acceptor,开始监听客户端连接。 3. **配置...
MINA 2.0 User Guide Part I - Basics Chapter 1 - Getting Started Chapter 2 - Basics Chapter 3 - Service Chapter 4 - Session Chapter 5 - Filters Chapter 6 - Transports Chapter 7 - Handler Part II - ...
**标题:“mina2.0案例”** **描述:**该案例是关于Java网络编程中使用的异步传输框架——Mina 2.0的实践应用,包括了服务端(server)和客户端(client)的实现。 **知识点详解:** 1. **Mina 2.0框架**:Mina...
《Mina2.0框架源码剖析》 Apache Mina是一个高性能、轻量级的网络通信框架,常用于构建基于TCP/IP和UDP/IP协议的应用,如服务器端的开发。Mina2.0作为其一个重要版本,引入了许多优化和改进,为开发者提供了更强大...
在 Mina 2.0 中,FilterChain 是一种非常重要的机制,它允许开发者插入一系列过滤器来处理入站和出站的数据流。这些过滤器可以用来执行日志记录、编解码、压缩等多种功能。 **2.3 ProtocolCodecFilter** `...
在官方提供的信息中,“mina2.0源码svn地址”指的是获取Mina2.0源代码的Subversion(SVN)仓库地址。具体来说,该地址为:`https://svn.apache.org/repos/asf/mina`。通过这个地址,开发者可以直接访问到Mina项目的SVN...
Mina2.0框架源码剖析 Mina2.0是一个基于Java的网络应用框架,提供了一个简洁、灵活的API,帮助开发者快速构建高性能的网络应用程序。下面是Mina2.0框架源码剖析的相关知识点: 一、Mina2.0框架概述 Mina2.0是一个...
Mina2.0作为其更新版本,提供了更丰富的功能和优化的性能,使得开发者能更高效地构建网络服务。 **入门** 1. **下载使用的Jar包**:在开始Mina2.0的学习之前,你需要从Apache官网或其他可靠的源下载Mina的最新版本...
Mina2.0 快速入门与源码剖析 Mina2.0 是一个基于 Java 的网络应用框架,提供了一个高效、可扩展的网络通信解决方案。下面是 Mina2.0 快速入门与源码剖析的知识点总结: 一、Mina2.0 快速入门 Mina2.0 的快速入门...
《Mina2.0自学手册》是一本针对初学者设计的教程,旨在详细指导读者如何使用mina框架来实现网络通信。mina是一个基于Java的网络应用框架,它使用Java NIO(New Input/Output)技术来实现高吞吐量和低延迟的网络通信...
### Mina2.0学习笔记核心知识点概览 #### 一、Mina入门与环境搭建 **Mina简介** Mina是Apache旗下的一款强大的网络应用框架,专为高性能和高可扩展性网络应用设计,其核心是利用Java NIO技术提供事件驱动的异步API...
自己整理的一些mina学习资料,内含MINA官方教程(中文版).docx,MINA-2.0.0-M4.chm(英文版),Apache_Mina_Server_2.0中文参考手册V1.0.pdf, 还有mina的包
过滤器可以用来实现数据编码解码、安全验证、流量控制等功能。通过IoFilterChain,多个过滤器可以串联起来,形成一个处理链路,每个过滤器都可以对数据进行预处理或后处理。 **4. IoHandler接口** IoHandler接口...
MINA2.0 用户手册中文随笔翻译 MINA 是一个基于 NIO(Non-Blocking I/O)的网络框架,提供了统一的接口来处理 TCP、UDP 和其他机制的通信。MINA 的主要特点是能够处理大量的 socket 连接,并提供了一个高层接口来...
本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...
### Mina2.0完全剖析,完全自学手册 #### Apache Mina 概述 Apache Mina(Multipurpose Infrastructure Networked Applications)是一个强大的网络应用框架,主要用于帮助开发者构建高性能且易于扩展的网络应用程序...