Mina之session
http://www.cnblogs.com/ggzwtj/archive/2011/10/14/2212095.html
1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。
一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:
public interface IoSessionDataStructureFactory { //返回属性 IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception; //返回写请求队列 WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception; }
2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:
/* * 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息 * 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开 * 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。 */ boolean isUseReadOperation(); /* * 打开或关闭IoSession的read方法。 */ void setUseReadOperation(boolean useReadOperation);
3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:
public interface IoSessionInitializer<T extends IoFuture> { void initializeSession(IoSession session, T future); }
4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:
public interface IoSessionRecycler { /** * 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session * 的生命周期事件被fired */ static IoSessionRecycler NOOP = new IoSessionRecycler() { public void put(IoSession session) {} public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) { return null; } public void remove(IoSession session) {} }; /* * 创建或写Iossion的时候被调用。 */ void put(IoSession session); /* * 尝试获取一个被回收了的IoSession。 */ IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress); /* * 会话被关闭的时候调用。 */ void remove(IoSession session); }
ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:
private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话 private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作
下面来看Key是什么样的:
private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) { List<SocketAddress> key = new ArrayList<SocketAddress>(2); key.add(remoteAddress); key.add(localAddress); return key; }
ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:
public class ExpiringMap<K, V> implements Map<K, V> { public static final int DEFAULT_TIME_TO_LIVE = 60; public static final int DEFAULT_EXPIRATION_INTERVAL = 1; private static volatile int expirerCount = 1; private final ConcurrentHashMap<K, ExpiringObject> delegate; private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners; private final Expirer expirer; }
其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:
private class ExpiringObject { private K key; private V value; private long lastAccessTime;//上次访问时间 private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock(); }
mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:
private void processExpires() { long timeNow = System.currentTimeMillis();//当前时间 for (ExpiringObject o : delegate.values()) { if (timeToLiveMillis <= 0) { continue; } long timeIdle = timeNow - o.getLastAccessTime(); if (timeIdle >= timeToLiveMillis) { delegate.remove(o.getKey());//移除 for (ExpirationListener<V> listener : expirationListeners) { listener.expired(o.getValue());//终止监听? } } } }
启动关闭该县城都需要进行封锁机制。
5、Mina中的I/O事件类型如下:
public enum IoEventType { SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE, }
IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:
public class IoEvent implements Runnable { private final IoEventType type; private final IoSession session; private final Object parameter; public IoEvent(IoEventType type, IoSession session, Object parameter) { //... } //根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。 public void fire() { switch (getType()) { case MESSAGE_RECEIVED: getSession().getFilterChain().fireMessageReceived(getParameter());break; case MESSAGE_SENT: getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break; case WRITE: getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break; case CLOSE: getSession().getFilterChain().fireFilterClose();break; case EXCEPTION_CAUGHT: getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break; case SESSION_IDLE: getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break; case SESSION_OPENED: getSession().getFilterChain().fireSessionOpened(); break; case SESSION_CREATED: getSession().getFilterChain().fireSessionCreated(); break; case SESSION_CLOSED: getSession().getFilterChain().fireSessionClosed(); break; default: throw new IllegalArgumentException("Unknown event type: " + getType()); } } public String toString() { if (getParameter() == null) { return "[" + getSession() + "] " + getType().name(); } return "[" + getSession() + "] " + getType().name() + ": "+ getParameter(); } }
Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:
public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory { public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception { return new DefaultIoSessionAttributeMap(); } public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception { return new DefaultWriteRequestQueue(); } private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap { private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4)); public DefaultIoSessionAttributeMap() { super(); } public Object getAttribute(IoSession session, Object key, Object defaultValue) { if (key == null) { throw new NullPointerException("key"); } Object answer = attributes.get(key); if (answer == null) { return defaultValue; } return answer; } public Object setAttribute(IoSession session, Object key, Object value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { return attributes.remove(key); } return attributes.put(key, value); } public Object setAttributeIfAbsent(IoSession session, Object key, Object value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { return null; } Object oldValue; synchronized (attributes) { oldValue = attributes.get(key); if (oldValue == null) { attributes.put(key, value); } } return oldValue; } public Object removeAttribute(IoSession session, Object key) { if (key == null) { throw new NullPointerException("key"); } return attributes.remove(key); } public boolean removeAttribute(IoSession session, Object key, Object value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { return false; } synchronized (attributes) { if (value.equals(attributes.get(key))) { attributes.remove(key); return true; } } return false; } public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) { synchronized (attributes) { Object actualOldValue = attributes.get(key); if (actualOldValue == null) { return false; } if (actualOldValue.equals(oldValue)) { attributes.put(key, newValue); return true; } return false; } } public boolean containsAttribute(IoSession session, Object key) { return attributes.containsKey(key); } public Set<Object> getAttributeKeys(IoSession session) { synchronized (attributes) { return new HashSet<Object>(attributes.keySet()); } } public void dispose(IoSession session) throws Exception {} } private static class DefaultWriteRequestQueue implements WriteRequestQueue { //一个队列存储传入的写请求 private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16); public DefaultWriteRequestQueue() { super(); } //... } }
7、AbstractIoSession是IoSession的一个抽象实现类,如下:
private IoSessionAttributeMap attributes;//会话属性 private WriteRequestQueue writeRequestQueue;//写请求队列 private WriteRequest currentWriteRequest;//当前写请求
下面是关闭的时候涉及的成员:
//要结束当前会话时发送写请求CLOSE_REQUEST private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); //在连接关闭时状态被设置为closed private final CloseFuture closeFuture = new DefaultCloseFuture(this); //关闭的监听器 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { public void operationComplete(CloseFuture future) { AbstractIoSession session = (AbstractIoSession) future.getSession(); session.scheduledWriteBytes.set(0); session.scheduledWriteMessages.set(0); session.readBytesThroughput = 0; session.readMessagesThroughput = 0; session.writtenBytesThroughput = 0; session.writtenMessagesThroughput = 0; } };
close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:
public final CloseFuture close() { synchronized (lock) { if (isClosing()) { return closeFuture; } closing = true; } getFilterChain().fireFilterClose();//fire出关闭事件 return closeFuture; }
下面是closeOnFlush的代码:
private final CloseFuture closeOnFlush() { getWriteRequestQueue().offer(this, CLOSE_REQUEST); getProcessor().flush(this); return closeFuture; }
对于读的情况,下面是取得读的数据队列:
//返回可被读取数据队列 private Queue<ReadFuture> getReadyReadFutures() { Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); //如果是第一次读取数据 if (readyReadFutures == null) { //构造一个新的读数据队列 readyReadFutures = new CircularQueue<ReadFuture>(); Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures); if (oldReadyReadFutures != null) { readyReadFutures = oldReadyReadFutures; } } return readyReadFutures; }
读数据的过程:
public final ReadFuture read() { //配置不允许读数据 if (!getConfig().isUseReadOperation()) { throw new IllegalStateException("useReadOperation is not enabled."); } //获得已经被许可的可被读数据队列 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = readyReadFutures.poll(); if (future != null) { //如果关联的会话已关闭,通知读者 if (future.isClosed()) { readyReadFutures.offer(future); } } else { future = new DefaultReadFuture(this); //将数据出入等待读的队列 getWaitingReadFutures().offer(future); } } return future; }
写数据的过程:
//IoBuffer、文件、文件部分区域 public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) { throw new NullPointerException("message"); } //如果没有远端地址 if (!getTransportMetadata().isConnectionless() && remoteAddress != null) { throw new UnsupportedOperationException(); } //如果会话已被关闭 if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; try { if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息 throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) {//文件的某一区域 FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) {//要发送的是文件 File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } //构造写请求,通过过滤器链发送出去 WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); //如果打开文件通道应该在完成时关闭通道 if (openedFileChannel != null) { final FileChannel finalChannel = openedFileChannel; writeFuture.addListener(new IoFutureListener<WriteFuture>() { public void operationComplete(WriteFuture future) { try { finalChannel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }); } return writeFuture; }
相关推荐
Apache Mina是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"apache-mina-2.0.4.rar"压缩包包含的是Apache Mina 2.0.4版本的源代码,是深入理解和定制Mina的...
Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...
Mina和Socket是两种常见的网络通信框架和技术,它们在Java编程环境中被广泛使用。本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic ...
Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...
Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...
**mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...
标题中的“给予mina协议进行大数据传输”指的是一种基于Java的网络通信框架——Apache MINA(Model-View-Controller for Network Applications)。MINA是Apache软件基金会的一个项目,它提供了一个高度可扩展和高...
Apache Mina是一个高度可扩展的网络通信框架,它允许开发者创建高性能、高效率的服务端和客户端应用程序。在Java世界中,Mina以其简洁的API和灵活性而受到青睐,尤其适用于处理大量的并发连接,如TCP/IP和UDP协议。...
在本文中,我们将深入探讨如何将Spring Boot与Mina进行深度整合,以便为新手开发者提供一个开箱即用的解决方案。Spring Boot以其简洁的配置和快速的开发体验,已经成为Java领域中的主流微服务框架,而Mina则是一个...
MINA (Java IO Network Application Framework) 是一个由Apache软件基金会开发的开源网络通信框架,主要应用于构建高性能、高可用性的网络服务器。这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和...
mina心跳包机制是Apache Mina框架中的一个关键特性,它用于维持网络连接的活跃状态,确保数据能够在客户端和服务端之间顺畅地传输。Mina是一个高度可扩展的Java网络应用框架,广泛应用于各种分布式系统和网络服务,...
Apache Mina是一个强大的网络通信框架,专为基于TCP/IP和UDP/IP协议栈的应用设计。它提供了JAVA对象的序列化和虚拟机内部通信的功能,使得开发者能够迅速构建高性能、高可扩展性的网络应用。Mina的核心特性是其事件...
mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...
**Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...
Apache Mina是一个开源的网络通信框架,主要用于简化Java应用程序与远程服务器之间的通信。它提供了高度可扩展和高性能的网络协议处理能力,支持多种传输层协议,如TCP/IP、UDP/IP和SSL/TLS等。在本示例中,我们关注...
在IT行业中,网络通信是不可或缺的一部分,而Apache MINA(Model-Independent Network Application Framework)是一个高性能、异步的网络应用程序框架,广泛应用于TCP/IP和UDP协议的开发。当我们遇到"MINA断线重连...
Mina开源框架是一款广泛应用于Java环境的网络通信应用框架,其设计目标是提供一个高度可扩展、高性能且稳定的网络通信接口。在Mina框架中,心跳机制扮演着至关重要的角色,它确保了网络连接的健康性和可靠性。心跳...
在IT行业中,网络通信是核心领域之一,而Apache Mina作为一个高效的、可扩展的网络应用框架,被广泛用于创建高性能的TCP和UDP服务。本文将深入探讨“mina断包”和“粘包”问题,以及如何通过提供的mina_optimize代码...
MINA2.0 用户手册中文随笔翻译 MINA 是一个基于 NIO(Non-Blocking I/O)的网络框架,提供了统一的接口来处理 TCP、UDP 和其他机制的通信。MINA 的主要特点是能够处理大量的 socket 连接,并提供了一个高层接口来...
**Android-MinaSocket:基于Mina的高效Socket长连接库** 在移动应用开发中,尤其是Android平台,实时性与稳定性是许多应用场景的核心需求,比如在线游戏、即时通讯、物联网设备等。在这种背景下,使用Socket进行长...