- 浏览: 139637 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
elephant_xiang:
condition例子,搞错了吧public void pro ...
jdk1.5的多线程总结一 -
yinlei126:
这么好的文章没有评论,谢谢
Mina框架剖析--动态篇 -
xds2008:
写的不错,值得表扬,测了一下,还不错,就是有点慢,但是最起码远 ...
java实现远程桌面监控 -
exp111:
这个确实很强 但是能不能连接一次不停的传呢 这个好像是必须连一 ...
java实现远程桌面监控 -
seeallsea:
不错!可以相互转换了。
keystore证书转换
一、基础框架
IoService:IoService相当于是Mina的Socket层,负责所有SocketIO事件的注册,select,分发等。它位于org.apache.mina.core.service包内,它有两个子接口,表示Server端接收方的IoAcceptor和Client发起方的IoConnector,以及所有的实现类:
NioDatagramAcceptor/NioDatagramConnector:基于UDP的实现
NioSocketAcceptor/NioSocketConnector:基于TCP的实现
VmPipeAcceptor/VmPipeConnector:基于Pipe的实现
服务端初始化的方式一般是:
初始化NioSocketAcceptor的时候会做主要事情:
1.需要通过具体的实现提供的transportMetaData,来判断其中规定的session配置类(eg:DefaultSocketSessionConfig)是否来自接口SessionConfig.
2.默认启动了一个SimpleIoProcessorPool来包装NioProcessor.
而SimpleIoProcessorPool默认是启动CPU个数 +1个 NioProcess,并以数组形式管理。
在这里并没有指定Executor,因此用默认的Executors.newCachedThreadPool().这个ThreadPool管理着NioSocketAcceptor和所有的IoProcessor处理线程.
在这里,NioSocketAcceptor可以理解为一个Server,而NioProcessor就是其中的并行的处理程序。NioProcessor默认个数是CPU个数+1,这正好是属于Processor的selector的个数,它们专门处理OP_READ/OP_WRITE事件。在NioSocketAcceptor里面也有个Selector,它是专门用作处理OP_ACCEPT事件.这个分离设计使OP_ACCEPT不被读写事件所影响。因此,采用默认设置,MINA会启动 CPU个数+2 Selector.
每个NioSocketAcceptor内部有个Acceptor线程对象,NioSocketAcceptor会使用传入的或者生成的Executor来执行这个Acceptor。
每个NioProcessor内部也有个Processor线程对象,NioProcessor会使用传入的或者生成的Executor来执行这个Processor。
3.IoService初始化时,建立与监听器容器IoServiceListenerSupport的双向关联,注册匿名内部实现serviceActivationListener到监听器容器.
客户端的启动方式:
其基本过程跟NioSocketAcceptor差不多.
IoFilter
IoFilter也是一个接口,有点像ServletFilter,在事件被 IoHandler处理之前或之后进行一些特定的操作,比如记录日志(LoggingFilter)、压缩数据(CompressionFilter),SSL加密(SSLFilter)黑名单过滤(BlackListFilter),心跳检测(KeepAliveFilter)等等。这些都是MINA自带的。
实际项目中,有个东西肯定是存在的,就是ProtocolCodecFilter或者跟它类似的filter.我们不可能让IoHandler直接去操作字节流,所以在这一层,一定要把二进制流转成java对象或者文本,这样才能不枉费MINA的良好分层设计。
ProtocolCodecFilter
使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。
ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。在笔者所做的自定义协议环境下,在这里就需要重写ObjectSerializationCodecFactory。
TextLineCodecFactory就是 String跟ByteBuffer的转化,比如HTTP,RTSP这类纯文本协议。
IoFilter的顺序问题:IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。
ExecutorFilter
有一个比较重要的过滤器就是ExecutorFilter。前面已经知道,每个NioProcess对应的是一个Process内部线程对象,在Process的run方法里面会循环的调用process方法。也就是第一个事件没有执行完,第二个事件不会执行,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。ExecutorFilter的的作用就是将同一个类型的消息合并起来按顺序调用
从上面的代码可以看到,如果满足事件集合条件,就组装成IoFilterEvent交给Executor异步执行。这样在filter上就不会因为某个事件filter执行时间过长而block后面的事件。
实际上MINA是用filterChain的方式顺序调用所有注册的filter.默认的DefaultIoFilterChain
然后就会一个filter接一个的传下去,直到最后的尾调用IoHandler.
IoHandler
IoHanlder则是所有事件最终产生响应的位置,一般用来处理业务比如分析数据,数据库操作等。
messageSent是服务器发送消息的事件,一般情况下不会使用它。
sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能放在sessionOpened事件中。
IoSession
IoSession是一个接口,它提供了对当前连接的操作功能,还有用户定义属性的存储功能,就像HttpSession。一个IoSession就代表一个Client与Server的连接。IoSession是线程安全的,第一层子类AbstractIoSession内部用到了大量的lock机制,因此可以放心的使用而不用担心并发问题。常用的方法:
主要方法可以分为三类:
连接操作
最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,实际传入的类型应该是最后一个filter处理后的结果。最初始的状态下,message是一个IoBuffer.
另外注意的是,write返回的WriteFuture类,这样调用IoSession.write方法是不会阻塞的。调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就看线程的调度处理了。非常典型的Future模式的应用。
如果需要确认消息是否成功的发送出去了,只需要wait一下,然后检查下future的状态。
当调用write的时候,会通过跟read相反的次序依次传递,直至由IoService负责把数据发送给客户端.
如果在很短的时间里,对同一个IoSession进行了两次write操作,客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销。
属性存储操作
一般用于状态维护。参考HttpSession的作用。跟Attribute相关的4个方法都是跟这个相关的。
连接状态
最后面的4个方法全是连接属性的查询。没什么好说的。
IoService:IoService相当于是Mina的Socket层,负责所有SocketIO事件的注册,select,分发等。它位于org.apache.mina.core.service包内,它有两个子接口,表示Server端接收方的IoAcceptor和Client发起方的IoConnector,以及所有的实现类:
NioDatagramAcceptor/NioDatagramConnector:基于UDP的实现
NioSocketAcceptor/NioSocketConnector:基于TCP的实现
VmPipeAcceptor/VmPipeConnector:基于Pipe的实现
服务端初始化的方式一般是:
SocketAcceptor acceptor = new NioSocketAcceptor(); acceptor.setReuseAddress( true ); acceptor.setHandler(new EchoProtocolHandler()); acceptor.bind(new InetSocketAddress(PORT));
初始化NioSocketAcceptor的时候会做主要事情:
1.需要通过具体的实现提供的transportMetaData,来判断其中规定的session配置类(eg:DefaultSocketSessionConfig)是否来自接口SessionConfig.
public NioSocketAcceptor() { super(new DefaultSocketSessionConfig(), NioProcessor.class); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); }
2.默认启动了一个SimpleIoProcessorPool来包装NioProcessor.
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true); }
而SimpleIoProcessorPool默认是启动CPU个数 +1个 NioProcess,并以数组形式管理。
private static final int DEFAULT_SIZE = Runtime.getRuntime() .availableProcessors() + 1; private final IoProcessor<T>[] pool; public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) { this(processorType, null, DEFAULT_SIZE); } public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor, int size) { if (processorType == null) { throw new NullPointerException("processorType"); } if (size <= 0) { throw new IllegalArgumentException("size: " + size + " (expected: positive integer)"); } if (executor == null) { this.executor = executor = Executors.newCachedThreadPool(); this.createdExecutor = true; } else { this.executor = executor; this.createdExecutor = false; } pool = new IoProcessor[size]; boolean success = false; Constructor<? extends IoProcessor<T>> processorConstructor = null; boolean usesExecutorArg = true; .... }
在这里并没有指定Executor,因此用默认的Executors.newCachedThreadPool().这个ThreadPool管理着NioSocketAcceptor和所有的IoProcessor处理线程.
在这里,NioSocketAcceptor可以理解为一个Server,而NioProcessor就是其中的并行的处理程序。NioProcessor默认个数是CPU个数+1,这正好是属于Processor的selector的个数,它们专门处理OP_READ/OP_WRITE事件。在NioSocketAcceptor里面也有个Selector,它是专门用作处理OP_ACCEPT事件.这个分离设计使OP_ACCEPT不被读写事件所影响。因此,采用默认设置,MINA会启动 CPU个数+2 Selector.
每个NioSocketAcceptor内部有个Acceptor线程对象,NioSocketAcceptor会使用传入的或者生成的Executor来执行这个Acceptor。
每个NioProcessor内部也有个Processor线程对象,NioProcessor会使用传入的或者生成的Executor来执行这个Processor。
3.IoService初始化时,建立与监听器容器IoServiceListenerSupport的双向关联,注册匿名内部实现serviceActivationListener到监听器容器.
客户端的启动方式:
NioSocketConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast( "logger", new LoggingFilter() ); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" )))); connector.setConnectTimeout(30); connector.setHandler(new TimeClientHandler()); ConnectFuture cf = connector.connect( new InetSocketAddress("127.0.0.1", 8833));
其基本过程跟NioSocketAcceptor差不多.
IoFilter
IoFilter也是一个接口,有点像ServletFilter,在事件被 IoHandler处理之前或之后进行一些特定的操作,比如记录日志(LoggingFilter)、压缩数据(CompressionFilter),SSL加密(SSLFilter)黑名单过滤(BlackListFilter),心跳检测(KeepAliveFilter)等等。这些都是MINA自带的。
实际项目中,有个东西肯定是存在的,就是ProtocolCodecFilter或者跟它类似的filter.我们不可能让IoHandler直接去操作字节流,所以在这一层,一定要把二进制流转成java对象或者文本,这样才能不枉费MINA的良好分层设计。
ProtocolCodecFilter
使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。
ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。在笔者所做的自定义协议环境下,在这里就需要重写ObjectSerializationCodecFactory。
TextLineCodecFactory就是 String跟ByteBuffer的转化,比如HTTP,RTSP这类纯文本协议。
IoFilter的顺序问题:IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。
ExecutorFilter
有一个比较重要的过滤器就是ExecutorFilter。前面已经知道,每个NioProcess对应的是一个Process内部线程对象,在Process的run方法里面会循环的调用process方法。也就是第一个事件没有执行完,第二个事件不会执行,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。ExecutorFilter的的作用就是将同一个类型的消息合并起来按顺序调用
public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) { if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message); fireEvent(event); } else { nextFilter.messageReceived(session, message); } }
从上面的代码可以看到,如果满足事件集合条件,就组装成IoFilterEvent交给Executor异步执行。这样在filter上就不会因为某个事件filter执行时间过长而block后面的事件。
protected void fireEvent(IoFilterEvent event) { executor.execute(event); }
public void fire() { IoSession session = getSession(); NextFilter nextFilter = getNextFilter(); IoEventType type = getType(); if ( LOGGER.isDebugEnabled()) { LOGGER.debug( "Firing a {} event for session {}",type, session.getId() ); } switch (type) { case MESSAGE_RECEIVED: Object parameter = getParameter(); nextFilter.messageReceived(session, parameter); break; case MESSAGE_SENT: WriteRequest writeRequest = (WriteRequest)getParameter(); nextFilter.messageSent(session, writeRequest); break; case WRITE: writeRequest = (WriteRequest)getParameter(); nextFilter.filterWrite(session, writeRequest); break; case CLOSE: nextFilter.filterClose(session); break; case EXCEPTION_CAUGHT: Throwable throwable = (Throwable)getParameter(); nextFilter.exceptionCaught(session, throwable); break; case SESSION_IDLE: nextFilter.sessionIdle(session, (IdleStatus) getParameter()); break; case SESSION_OPENED: nextFilter.sessionOpened(session); break; case SESSION_CREATED: nextFilter.sessionCreated(session); break; case SESSION_CLOSED: nextFilter.sessionClosed(session); break; default: throw new IllegalArgumentException("Unknown event type: " + type); } if ( LOGGER.isDebugEnabled()) { LOGGER.debug( "Event {} has been fired for session {}", type, session.getId() ); } }
实际上MINA是用filterChain的方式顺序调用所有注册的filter.默认的DefaultIoFilterChain
if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf);
然后就会一个filter接一个的传下去,直到最后的尾调用IoHandler.
IoHandler
IoHanlder则是所有事件最终产生响应的位置,一般用来处理业务比如分析数据,数据库操作等。
void sessionCreated(IoSession session) throws Exception; void sessionOpened(IoSession session) throws Exception; void sessionClosed(IoSession session) throws Exception; void sessionIdle(IoSession session, IdleStatus status) throws Exception; void exceptionCaught(IoSession session, Throwable cause) throws Exception; void messageReceived(IoSession session, Object message) throws Exception; void messageSent(IoSession session, Object message) throws Exception;messageReceived是接收客户端消息的事件,我们应该在这里实现业务逻辑。
messageSent是服务器发送消息的事件,一般情况下不会使用它。
sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能放在sessionOpened事件中。
IoSession
IoSession是一个接口,它提供了对当前连接的操作功能,还有用户定义属性的存储功能,就像HttpSession。一个IoSession就代表一个Client与Server的连接。IoSession是线程安全的,第一层子类AbstractIoSession内部用到了大量的lock机制,因此可以放心的使用而不用担心并发问题。常用的方法:
// WriteFuture write(Object message) CloseFuture close(); //属性相关操作 Object getAttribute(String key); Object setAttribute(String key, Object value); Object removeAttribute(String key); Set getAttributeKeys(); //连接状态操作 boolean isConnected(); boolean isClosing(); SocketAddress getRemoteAddress(); boolean isIdle(IdleStatus status);
主要方法可以分为三类:
连接操作
最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,实际传入的类型应该是最后一个filter处理后的结果。最初始的状态下,message是一个IoBuffer.
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf);
另外注意的是,write返回的WriteFuture类,这样调用IoSession.write方法是不会阻塞的。调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就看线程的调度处理了。非常典型的Future模式的应用。
如果需要确认消息是否成功的发送出去了,只需要wait一下,然后检查下future的状态。
WriteFuture future = session.write(xx); future.awaitUninterruptibly(); boolean isCompleted = future.isWritten()
当调用write的时候,会通过跟read相反的次序依次传递,直至由IoService负责把数据发送给客户端.
如果在很短的时间里,对同一个IoSession进行了两次write操作,客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销。
属性存储操作
一般用于状态维护。参考HttpSession的作用。跟Attribute相关的4个方法都是跟这个相关的。
连接状态
最后面的4个方法全是连接属性的查询。没什么好说的。
发表评论
-
Java监控文件夹变化
2011-01-13 15:44 79651. 线程轮询扫描 优点:纯java实现,完美跨平台。 缺点: ... -
RemoteTea概要(未完成)
2010-04-21 00:29 1583一,先整理下RPC调用 RPC(Remote Procedu ... -
Hibernate缓存
2010-04-16 11:30 794转载文章 一级缓存 1.Session 级别的缓存,它同s ... -
Mina框架剖析--动态篇
2010-04-06 19:35 3993一切从启动开始,MINA服务端启动代码: privat ... -
SpringAOP的一个问题
2009-07-13 15:05 1269问题:项目中用到了AOP方式记录日志,对于所有create开头 ... -
Lucene使用心得
2009-05-29 17:37 1150Lucene中两个最重要的概念,索引和搜索 索引:一个比 ... -
db4o使用心得之二
2009-05-26 01:02 19323)delete,update对象 把这两个操作放一起,是 ... -
db4o使用心得之一
2009-05-26 00:00 4157db4o主要的包 com.db4o: 是db4o最 ... -
Xstream实现对象和xml互转换
2009-05-20 23:43 2941产品里需要用到xml ...
相关推荐
- 组件结构:小程序的组件结构在MINA框架下进行,该框架支持网络通信,组件结构灵活多样,适用于不同的应用场景。 4. 关键技术: - API:微信小程序底层框架构建基于node.js模块,API技术允许使用react、redux等...
- **技术栈**:使用Hessian + Mina构建RPC框架,数据库采用MongoDB。 - **职责**:担任技术经理,负责系统的整体架构设计和业务开发工作,成功处理千万级数据和高并发访问需求。 #### 3. **金融商城项目** - **技术...
...然而,对于某些特定场景,如轻量级服务、嵌入式系统或者对性能有极致追求的应用,...通过理解和掌握`myhttp`的原理和实现,开发者可以更好地利用Mina框架进行网络服务开发,同时也为自定义网络协议处理提供了思路。
- 常见的Java Socket框架有Apache Mina和Netty,这两个框架提供了高效、灵活的网络通信能力。 5. **Java反射机制**: - 反射允许在运行时动态创建和操作类、接口、字段和方法。通过Class对象,可以创建任何类的...
7. 利用WebStorm的代码审查和静态分析工具,检查代码质量并发现潜在问题。 【MINA框架与编译过程】 微信小程序基于MINA框架,它是一个专为微信小程序设计的轻量级框架。MINA处理了小程序的渲染、生命周期管理和数据...
3. **9_3_数值处理组件**:这个组件专注于数值计算,可能包含统计分析、矩阵运算、复杂数运算等功能,例如Apache Commons Math库,能够帮助开发者进行科学计算和工程应用。 4. **7_文件操作组件**:文件操作组件...
Java作为跨平台的后端开发语言,提供了丰富的库和框架,如Spring Boot用于快速开发,Netty作为高性能的网络应用框架,以及Apache Mina或Jetty等,这些都能帮助构建高效的聊天服务器。 在解压的文件"chatserver-...