1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信 息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两 个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外, 还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置 的,在运行中可设置此端是否可读写。
一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:
public interface IoSessionDataStructureFactory { //返回属性 IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception; //返回写请求队列 WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception; }
2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:
/* * 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息 * 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开 * 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。 */ boolean isUseReadOperation(); /* * 打开或关闭IoSession的read方法。 */ void setUseReadOperation(boolean useReadOperation);
3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:
public interface IoSessionInitializer<T extends IoFuture> { void initializeSession(IoSession session, T future); }
4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:
public interface IoSessionRecycler { /** * 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session * 的生命周期事件被fired */ static IoSessionRecycler NOOP = new IoSessionRecycler() { public void put(IoSession session) {} public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) { return null; } public void remove(IoSession session) {} }; /* * 创建或写Iossion的时候被调用。 */ void put(IoSession session); /* * 尝试获取一个被回收了的IoSession。 */ IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress); /* * 会话被关闭的时候调用。 */ void remove(IoSession session); }
ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:
private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话 private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作
下面来看Key是什么样的:
private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) { List<SocketAddress> key = new ArrayList<SocketAddress>(2); key.add(remoteAddress); key.add(localAddress); return key; }
ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:
public class ExpiringMap<K, V> implements Map<K, V> { public static final int DEFAULT_TIME_TO_LIVE = 60; public static final int DEFAULT_EXPIRATION_INTERVAL = 1; private static volatile int expirerCount = 1; private final ConcurrentHashMap<K, ExpiringObject> delegate; private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners; private final Expirer expirer; }
其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:
private class ExpiringObject { private K key; private V value; private long lastAccessTime;//上次访问时间 private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock(); }
mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:
private void processExpires() { long timeNow = System.currentTimeMillis();//当前时间 for (ExpiringObject o : delegate.values()) { if (timeToLiveMillis <= 0) { continue; } long timeIdle = timeNow - o.getLastAccessTime(); if (timeIdle >= timeToLiveMillis) { delegate.remove(o.getKey());//移除 for (ExpirationListener<V> listener : expirationListeners) { listener.expired(o.getValue());//终止监听? } } } }
启动关闭该县城都需要进行封锁机制。
5、Mina中的I/O事件类型如下:
public enum IoEventType { SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE, }
IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:
public class IoEvent implements Runnable { private final IoEventType type; private final IoSession session; private final Object parameter; public IoEvent(IoEventType type, IoSession session, Object parameter) { //... } //根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。 public void fire() { switch (getType()) { case MESSAGE_RECEIVED: getSession().getFilterChain().fireMessageReceived(getParameter());break; case MESSAGE_SENT: getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break; case WRITE: getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break; case CLOSE: getSession().getFilterChain().fireFilterClose();break; case EXCEPTION_CAUGHT: getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break; case SESSION_IDLE: getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break; case SESSION_OPENED: getSession().getFilterChain().fireSessionOpened(); break; case SESSION_CREATED: getSession().getFilterChain().fireSessionCreated(); break; case SESSION_CLOSED: getSession().getFilterChain().fireSessionClosed(); break; default: throw new IllegalArgumentException("Unknown event type: " + getType()); } } public String toString() { if (getParameter() == null) { return "[" + getSession() + "] " + getType().name(); } return "[" + getSession() + "] " + getType().name() + ": "+ getParameter(); } }
Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:
public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory { public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception { return new DefaultIoSessionAttributeMap(); } public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception { return new DefaultWriteRequestQueue(); } private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap { private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4)); public DefaultIoSessionAttributeMap() { super(); } public Object getAttribute(IoSession session, Object key, Object defaultValue) { if (key == null) { throw new NullPointerException("key"); } Object answer = attributes.get(key); if (answer == null) { return defaultValue; } return answer; } public Object setAttribute(IoSession session, Object key, Object value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { return attributes.remove(key); } return attributes.put(key, value); } public Object setAttributeIfAbsent(IoSession session, Object key, Object value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { return null; } Object oldValue; synchronized (attributes) { oldValue = attributes.get(key); if (oldValue == null) { attributes.put(key, value); } } return oldValue; } public Object removeAttribute(IoSession session, Object key) { if (key == null) { throw new NullPointerException("key"); } return attributes.remove(key); } public boolean removeAttribute(IoSession session, Object key, Object value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { return false; } synchronized (attributes) { if (value.equals(attributes.get(key))) { attributes.remove(key); return true; } } return false; } public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) { synchronized (attributes) { Object actualOldValue = attributes.get(key); if (actualOldValue == null) { return false; } if (actualOldValue.equals(oldValue)) { attributes.put(key, newValue); return true; } return false; } } public boolean containsAttribute(IoSession session, Object key) { return attributes.containsKey(key); } public Set<Object> getAttributeKeys(IoSession session) { synchronized (attributes) { return new HashSet<Object>(attributes.keySet()); } } public void dispose(IoSession session) throws Exception {} } private static class DefaultWriteRequestQueue implements WriteRequestQueue { //一个队列存储传入的写请求 private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16); public DefaultWriteRequestQueue() { super(); } //... } }
7、AbstractIoSession是IoSession的一个抽象实现类,如下:
private IoSessionAttributeMap attributes;//会话属性 private WriteRequestQueue writeRequestQueue;//写请求队列 private WriteRequest currentWriteRequest;//当前写请求
下面是关闭的时候涉及的成员:
//要结束当前会话时发送写请求CLOSE_REQUEST private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); //在连接关闭时状态被设置为closed private final CloseFuture closeFuture = new DefaultCloseFuture(this); //关闭的监听器 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { public void operationComplete(CloseFuture future) { AbstractIoSession session = (AbstractIoSession) future.getSession(); session.scheduledWriteBytes.set(0); session.scheduledWriteMessages.set(0); session.readBytesThroughput = 0; session.readMessagesThroughput = 0; session.writtenBytesThroughput = 0; session.writtenMessagesThroughput = 0; } };
close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:
public final CloseFuture close() { synchronized (lock) { if (isClosing()) { return closeFuture; } closing = true; } getFilterChain().fireFilterClose();//fire出关闭事件 return closeFuture; }
下面是closeOnFlush的代码:
private final CloseFuture closeOnFlush() { getWriteRequestQueue().offer(this, CLOSE_REQUEST); getProcessor().flush(this); return closeFuture; }
对于读的情况,下面是取得读的数据队列:
//返回可被读取数据队列 private Queue<ReadFuture> getReadyReadFutures() { Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); //如果是第一次读取数据 if (readyReadFutures == null) { //构造一个新的读数据队列 readyReadFutures = new CircularQueue<ReadFuture>(); Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures); if (oldReadyReadFutures != null) { readyReadFutures = oldReadyReadFutures; } } return readyReadFutures; }
读数据的过程:
public final ReadFuture read() { //配置不允许读数据 if (!getConfig().isUseReadOperation()) { throw new IllegalStateException("useReadOperation is not enabled."); } //获得已经被许可的可被读数据队列 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = readyReadFutures.poll(); if (future != null) { //如果关联的会话已关闭,通知读者 if (future.isClosed()) { readyReadFutures.offer(future); } } else { future = new DefaultReadFuture(this); //将数据出入等待读的队列 getWaitingReadFutures().offer(future); } } return future; }
写数据的过程:
//IoBuffer、文件、文件部分区域 public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) { throw new NullPointerException("message"); } //如果没有远端地址 if (!getTransportMetadata().isConnectionless() && remoteAddress != null) { throw new UnsupportedOperationException(); } //如果会话已被关闭 if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; try { if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息 throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) {//文件的某一区域 FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) {//要发送的是文件 File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } //构造写请求,通过过滤器链发送出去 WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); //如果打开文件通道应该在完成时关闭通道 if (openedFileChannel != null) { final FileChannel finalChannel = openedFileChannel; writeFuture.addListener(new IoFutureListener<WriteFuture>() { public void operationComplete(WriteFuture future) { try { finalChannel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }); } return writeFuture; }
相关推荐
Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...
1. **Filter Chain**:Mina的核心设计模式之一是过滤器链。每个连接都有一系列过滤器,它们按照顺序处理入站和出站事件。过滤器可以实现特定功能,如数据编码解码、安全验证、性能监控等。 2. **Session**:Session...
**Mina入门:Mina版之HelloWorld** Apache Mina是一个开源项目,它提供了一个高度模块化、高性能的网络通信框架。Mina旨在简化网络应用的开发,支持多种传输协议,如TCP、UDP、HTTP、FTP等。在这个“Mina入门:Mina...
《Mina开发之客户端详解》 Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的、高性能的网络应用框架,主要用于构建服务器端的网络应用程序。它简化了网络编程的复杂性,提供了基于事件驱动和异步I/O...
springboot集成mina做分布式的soket服务,思路:用java线性安全的集合存储mina session到本地服务器;同时用redis等nosql缓存mina session存储服务器的ip,session的id等信息到redis上;应用服务器直接用restful等...
import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.DefaultTransportMetadata; import org.apache.mina.core.service....
1. **Session**: 在Mina中,Session代表一个连接。它封装了网络连接的细节,如网络套接字,并提供了读写数据的方法。 2. **IoHandler**: IoHandler是处理网络事件的主要接口,它定义了当有数据读取、写入、连接建立...
**Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...
3. **Filter Chain**:Mina的过滤器链是其核心特性之一,它允许开发者插入自定义的过滤器来处理输入和输出数据,执行如认证、加密、压缩等操作。 4. **Protocol Codec**:编码和解码是网络通信中的重要环节,Mina...
当客户端连接建立后,Mina会为每个连接创建一个Session,用于管理与该客户端的交互。在这个"mina demo"中,我们可能看到服务端定义了一个Handler类,它处理接收到的数据,并生成相应的响应。Handler类是Mina的核心...
Session是事件处理的核心,当有网络活动时,MINA会调用Session的相关回调方法。 4. **Protocol Buffers**:MINA提供了多种协议缓冲区,如ByteBuffer和MinaBuffer,用于高效地读写网络数据。这些缓冲区支持动态扩容...
**mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...
当连接断开时,`sessionClosed()`方法会被触发。 在"Mina开发实例"中,项目使用了Maven构建系统。Maven帮助开发者管理依赖,使得引入和更新Mina库以及其他相关库变得简单。在`pom.xml`文件中,可以看到对Mina库的...
例如,创建Acceptor来监听特定端口,当有新的连接请求时,MINA会自动创建一个Session对象,并调用IoHandler的事件处理方法。 ```java public class SimpleServerHandler implements IoHandler { @Override ...
当客户端连接建立后,Mina 将自动创建一个新的 Session 对象。 6. **Client 端实现** 在 `socket_client` 文件夹中,源代码将展示如何建立到 Mina 服务器的连接。客户端同样需要配置过滤器链和协议处理器,然后...
在Mina中,实现长连接的关键在于编写正确的`SessionHandler`,这个处理器会监听并处理连接的各种事件,如`sessionCreated`、`messageReceived`、`sessionClosed`等。当客户端发起连接请求时,服务器端的`Acceptor`会...
1. **Session**:Mina中的Session代表了客户端和服务端之间的连接。它封装了与网络连接相关的所有信息,如网络套接字、缓冲区和事件处理器。 2. **Filter**:过滤器是Mina的核心概念之一,它们允许你在数据传输过程...
SpringBoot整合Mina是一个常见的Java开发任务,它涉及到SpringBoot框架与Apache Mina网络通信库的集成。Mina是一个轻量级、高性能的网络应用框架,主要用于构建服务器端的网络应用,如TCP/IP和UDP协议的服务。而...
Mina2.0框架源码剖析 Mina2.0是一个基于Java的网络应用框架,提供了一个简洁、灵活的API,帮助开发者快速构建高性能的网络应用程序。下面是Mina2.0框架源码剖析的相关知识点: 一、Mina2.0框架概述 Mina2.0是一个...
**Mina官网例子之时间服务器** Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的网络通信框架,它为各种协议提供了低级别的基础结构,包括TCP/IP和UDP/IP。Mina的目标是简化网络编程,使其变得高效...