`
hougechuanqi
  • 浏览: 73137 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Mina2.0控制读取、写出流量代码(出自red5)

    博客分类:
  • MINA
 
阅读更多

public class TrafficShapingFilter extends IoFilterAdapter {

 protected static Logger log = LoggerFactory.getLogger(TrafficShapingFilter.class);

 private final AttributeKey STATE = new AttributeKey(getClass(), "state");

 //定时器服务,用来创建定时任务的定时器服务类
 private final ScheduledExecutorService scheduledExecutor;

 //用来估算对象占据JVM内存的大小的类,计算出一个对象实际占用JVM内存的大小
 private final MessageSizeEstimator messageSizeEstimator;

 private volatile int maxReadThroughput;

 private volatile int maxWriteThroughput;

 private volatile int poolSize = 1;

 /**
  * 构造方法
  * @param maxReadThroughput 最大读取字节大小(单位:秒)
  * @param maxWriteThroughput 最大写出的字节大小(单位:秒)
  */
 public TrafficShapingFilter(int maxReadThroughput, int maxWriteThroughput) {
  this(null, null, maxReadThroughput, maxWriteThroughput);
 }

 public TrafficShapingFilter(ScheduledExecutorService scheduledExecutor, int maxReadThroughput,
   int maxWriteThroughput) {
  this(scheduledExecutor, null, maxReadThroughput, maxWriteThroughput);
 }

 public TrafficShapingFilter(ScheduledExecutorService scheduledExecutor, MessageSizeEstimator messageSizeEstimator,
   int maxReadThroughput, int maxWriteThroughput) {

  log.debug("ctor - executor: {} estimator: {} max read: {} max write: {}", new Object[] { scheduledExecutor,
    messageSizeEstimator, maxReadThroughput, maxWriteThroughput });

  if (scheduledExecutor == null) {
   //实例化一个定时器对象,线程池的默认数量是1.
   scheduledExecutor = new ScheduledThreadPoolExecutor(poolSize);
   //throw new NullPointerException("scheduledExecutor");
  }

  if (messageSizeEstimator == null) {
   //实例化默认的估算消息大小的类
   messageSizeEstimator = new DefaultMessageSizeEstimator() {
    @Override
    public int estimateSize(Object message) {
     if (message instanceof IoBuffer) {
      return ((IoBuffer) message).remaining();
     }
     return super.estimateSize(message);
    }
   };
  }

  this.scheduledExecutor = scheduledExecutor;
  this.messageSizeEstimator = messageSizeEstimator;
  //设置最大读取字节长度(单位:秒)
  setMaxReadThroughput(maxReadThroughput);
  //设置最大写出字节长度(单位:秒)
  setMaxWriteThroughput(maxWriteThroughput);
 }

 public ScheduledExecutorService getScheduledExecutor() {
  return scheduledExecutor;
 }

 public MessageSizeEstimator getMessageSizeEstimator() {
  return messageSizeEstimator;
 }

 public int getMaxReadThroughput() {
  return maxReadThroughput;
 }

 public void setMaxReadThroughput(int maxReadThroughput) {
  if (maxReadThroughput < 0) {
   maxReadThroughput = 0;
  }
  this.maxReadThroughput = maxReadThroughput;
 }

 public int getMaxWriteThroughput() {
  return maxWriteThroughput;
 }

 public void setMaxWriteThroughput(int maxWriteThroughput) {
  if (maxWriteThroughput < 0) {
   maxWriteThroughput = 0;
  }
  this.maxWriteThroughput = maxWriteThroughput;
 }

 public int getPoolSize() {
  return poolSize;
 }

 public void setPoolSize(int poolSize) {
  if (poolSize < 1) {
   poolSize = 1;
  }
  this.poolSize = poolSize;
 }

 @Override
 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
  if (parent.contains(this)) {
   throw new IllegalArgumentException(
     "You can't add the same filter instance more than once.  Create another instance and add it.");
  }
  //给每一个session添加一个属性 STATE 属性,关联一个State对象。
  parent.getSession().setAttribute(STATE, new State());
  //调节会话sessiion读取buffer大小
  adjustReadBufferSize(parent.getSession());
 }

 @Override
 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
  //写完毕,关闭会话移除关联对象State
  parent.getSession().removeAttribute(STATE);
 }

 @Override
 public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) throws Exception {

  int maxReadThroughput = this.maxReadThroughput;
  //process the request if our max is greater than zero
  if (maxReadThroughput > 0) {
   final State state = (State) session.getAttribute(STATE);
   long currentTime = System.currentTimeMillis();

   long suspendTime = 0;
   boolean firstRead = false;
   synchronized (state) {
    //估算当前已经读取字节数组总量
    state.readBytes += messageSizeEstimator.estimateSize(message);
                 //如果读取回话挂起,suspendedRead=true
    if (!state.suspendedRead) {
     if (state.readStartTime == 0) {//表示第一次读取会话的数据
      firstRead = true;
      //设置会话开始读取数据的时间
      state.readStartTime = currentTime - 1000;
     }
                    //估算当前平均每秒读取字节流量大小
     long throughput = (state.readBytes * 1000 / (currentTime - state.readStartTime));
     if (throughput >= maxReadThroughput) {//如果平均读取流量大于设置的数值,
      //计算需要挂起的时间,((state.readBytes/maxReadThroughput)*1000)计算如果按照规定流量读取数据需要多少秒,然后再减去已经读取的时间差,计算出需要挂起的时间
      suspendTime = Math.max(0, (state.readBytes * 1000 / maxReadThroughput)
        - (firstRead ? 0 : currentTime - state.readStartTime));

      state.readBytes = 0;
      state.readStartTime = 0;
      state.suspendedRead = suspendTime != 0;
                        //调整会话状态,设置会话不在读取数据,利用定时器,挂起会话,
      adjustReadBufferSize(session);
     }
    }
   }

   if (suspendTime != 0) {
    session.suspendRead();
    scheduledExecutor.schedule(new Runnable() {
     public void run() {
      synchronized (state) {
       state.suspendedRead = false;
      }
      session.resumeRead();
     }
    }, suspendTime, TimeUnit.MILLISECONDS);
   }
  }

  nextFilter.messageReceived(session, message);

 }

 /**
  *
  * 调整session默认设置最大读取字节数组长度
  * @param session
  */
 private void adjustReadBufferSize(IoSession session) {
  int maxReadThroughput = this.maxReadThroughput;
  if (maxReadThroughput == 0) {//如果不限制读取流量返回
   return;
  }
  IoSessionConfig config = session.getConfig();
  if (config.getReadBufferSize() > maxReadThroughput) {
   config.setReadBufferSize(maxReadThroughput);
  }
  if (config.getMaxReadBufferSize() > maxReadThroughput) {
   config.setMaxReadBufferSize(maxReadThroughput);
  }
 }

 @Override
 public void messageSent(NextFilter nextFilter, final IoSession session, WriteRequest writeRequest) throws Exception {
          //得到当前系统设置最大写出字节
  int maxWriteThroughput = this.maxWriteThroughput;
  //process the request if our max is greater than zero
  if (maxWriteThroughput > 0) {
   final State state = (State) session.getAttribute(STATE);
   //得到系统时间
   long currentTime = System.currentTimeMillis();
            //挂起时间长度0
   long suspendTime = 0;
   boolean firstWrite = false;
   synchronized (state) {
    state.writtenBytes += messageSizeEstimator.estimateSize(writeRequest.getMessage());
    if (!state.suspendedWrite) {
     if (state.writeStartTime == 0) {
      firstWrite = true;
      //初始化写时间
      state.writeStartTime = currentTime - 1000;
     }
                    //计算平均写字节数组流量
     long throughput = (state.writtenBytes * 1000 / (currentTime - state.writeStartTime));
     if (throughput >= maxWriteThroughput) {//写流量超出系统设置,会话挂起操作
      //计算会话需要挂起的时间
      suspendTime = Math.max(0, state.writtenBytes * 1000 / maxWriteThroughput
        - (firstWrite ? 0 : currentTime - state.writeStartTime));
      state.writtenBytes = 0;
      state.writeStartTime = 0;
      state.suspendedWrite = suspendTime != 0;
     }
    }
   }

   if (suspendTime != 0) {
    log.trace("Suspending write");
    //挂起会话
    session.suspendWrite();
    //定时器执行定时挂起操作
    scheduledExecutor.schedule(new Runnable() {
     public void run() {
      synchronized (state) {
       state.suspendedWrite = false;
      }
      //挂起唤醒会话
      session.resumeWrite();
      log.trace("Resuming write");
     }
    }, suspendTime, TimeUnit.MILLISECONDS);
   }
  }
        //串模式,执行下一个过滤器
  nextFilter.messageSent(session, writeRequest);

 }

 /**
  * 状态标志
  * @author Administrator
  *
  */
 private static class State {
  /**
   * 开始读取数据的时间
   */
  private long readStartTime;

  /**
   * 开始写数据的时间
   */
  private long writeStartTime;

  /**
   * 读数据是否已经被挂起,true:挂起,false:未挂起
   */
  private boolean suspendedRead;

  /**
   * 是否写操作被挂起,true:被挂起,false:未被挂起
   */
  private boolean suspendedWrite;

  /**
   * 该会话总共被读取数据字节长度
   */
  private long readBytes;

  /**
   * 总共被写的数据的长度
   */
  private long writtenBytes;
 }
}

分享到:
评论

相关推荐

    Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)

    Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)

    Android Java Socket框架 Mina2.0

    在Mina2.0的示例代码中,客户端和服务器端通常包括以下步骤: 1. **配置服务端**:创建Acceptor,设置监听端口,添加必要的过滤器和处理器。 2. **启动服务端**:启动Acceptor,开始监听客户端连接。 3. **配置...

    Mina 2.0 User Guide(Mina 2.0 用户指南)

    MINA 2.0 User Guide Part I - Basics Chapter 1 - Getting Started Chapter 2 - Basics Chapter 3 - Service Chapter 4 - Session Chapter 5 - Filters Chapter 6 - Transports Chapter 7 - Handler Part II - ...

    mina2.0案例

    **标题:“mina2.0案例”** **描述:**该案例是关于Java网络编程中使用的异步传输框架——Mina 2.0的实践应用,包括了服务端(server)和客户端(client)的实现。 **知识点详解:** 1. **Mina 2.0框架**:Mina...

    Mina2.0框架源码剖析

    《Mina2.0框架源码剖析》 Apache Mina是一个高性能、轻量级的网络通信框架,常用于构建基于TCP/IP和UDP/IP协议的应用,如服务器端的开发。Mina2.0作为其一个重要版本,引入了许多优化和改进,为开发者提供了更强大...

    Mina 2.0快速入门与源码解析

    在 Mina 2.0 中,FilterChain 是一种非常重要的机制,它允许开发者插入一系列过滤器来处理入站和出站的数据流。这些过滤器可以用来执行日志记录、编解码、压缩等多种功能。 **2.3 ProtocolCodecFilter** `...

    mina2.0源码svn地址

    在官方提供的信息中,“mina2.0源码svn地址”指的是获取Mina2.0源代码的Subversion(SVN)仓库地址。具体来说,该地址为:`https://svn.apache.org/repos/asf/mina`。通过这个地址,开发者可以直接访问到Mina项目的SVN...

    Mina2.0框架源码剖析.pdf

    Mina2.0框架源码剖析 Mina2.0是一个基于Java的网络应用框架,提供了一个简洁、灵活的API,帮助开发者快速构建高性能的网络应用程序。下面是Mina2.0框架源码剖析的相关知识点: 一、Mina2.0框架概述 Mina2.0是一个...

    MIna2.0学习笔记

    Mina2.0作为其更新版本,提供了更丰富的功能和优化的性能,使得开发者能更高效地构建网络服务。 **入门** 1. **下载使用的Jar包**:在开始Mina2.0的学习之前,你需要从Apache官网或其他可靠的源下载Mina的最新版本...

    Mina2.0快速入门与源码剖析.docx

    Mina2.0 快速入门与源码剖析 Mina2.0 是一个基于 Java 的网络应用框架,提供了一个高效、可扩展的网络通信解决方案。下面是 Mina2.0 快速入门与源码剖析的知识点总结: 一、Mina2.0 快速入门 Mina2.0 的快速入门...

    Mina2.0自学手册

    《Mina2.0自学手册》是一本针对初学者设计的教程,旨在详细指导读者如何使用mina框架来实现网络通信。mina是一个基于Java的网络应用框架,它使用Java NIO(New Input/Output)技术来实现高吞吐量和低延迟的网络通信...

    Mina2.0学习笔记(修订版)

    ### Mina2.0学习笔记核心知识点概览 #### 一、Mina入门与环境搭建 **Mina简介** Mina是Apache旗下的一款强大的网络应用框架,专为高性能和高可扩展性网络应用设计,其核心是利用Java NIO技术提供事件驱动的异步API...

    mina2.0教程

    自己整理的一些mina学习资料,内含MINA官方教程(中文版).docx,MINA-2.0.0-M4.chm(英文版),Apache_Mina_Server_2.0中文参考手册V1.0.pdf, 还有mina的包

    Mina2.0学习笔记(完整版).doc

    过滤器可以用来实现数据编码解码、安全验证、流量控制等功能。通过IoFilterChain,多个过滤器可以串联起来,形成一个处理链路,每个过滤器都可以对数据进行预处理或后处理。 **4. IoHandler接口** IoHandler接口...

    MINA2.0用户手册中文随笔翻译

    MINA2.0 用户手册中文随笔翻译 MINA 是一个基于 NIO(Non-Blocking I/O)的网络框架,提供了统一的接口来处理 TCP、UDP 和其他机制的通信。MINA 的主要特点是能够处理大量的 socket 连接,并提供了一个高层接口来...

    Apache MINA 2.0 用户指南中英文对照阅读版[带书签]

    本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...

    Mina2.0完全剖析,完全自学手册

    ### Mina2.0完全剖析,完全自学手册 #### Apache Mina 概述 Apache Mina(Multipurpose Infrastructure Networked Applications)是一个强大的网络应用框架,主要用于帮助开发者构建高性能且易于扩展的网络应用程序...

Global site tag (gtag.js) - Google Analytics