`

Mina之session

    博客分类:
  • mina
 
阅读更多

1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信 息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两 个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外, 还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置 的,在运行中可设置此端是否可读写。

  一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:

public interface IoSessionDataStructureFactory {
    //返回属性
    IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
    //返回写请求队列
    WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}

2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:

复制代码
/*
 * 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息
 * 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开
 * 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。
 */
boolean isUseReadOperation();
/* 
 * 打开或关闭IoSession的read方法。
 */
void setUseReadOperation(boolean useReadOperation);
复制代码

3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:

public interface IoSessionInitializer<T extends IoFuture> {
    void initializeSession(IoSession session, T future);
}

4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:

复制代码
public interface IoSessionRecycler {
    /**
     * 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session
     * 的生命周期事件被fired
     */
    static IoSessionRecycler NOOP = new IoSessionRecycler() {
        public void put(IoSession session) {}
        public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) {
            return null;
        }
        public void remove(IoSession session) {}
    };
    /* 
     * 创建或写Iossion的时候被调用。
     */
    void put(IoSession session);
    /*
     * 尝试获取一个被回收了的IoSession。
     */
    IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
    /*
     * 会话被关闭的时候调用。
     */
    void remove(IoSession session);
}
复制代码

ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:

private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话
private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作

下面来看Key是什么样的:

    private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) {
        List<SocketAddress> key = new ArrayList<SocketAddress>(2);
        key.add(remoteAddress);
        key.add(localAddress);
        return key;
    }

ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:

复制代码
public class ExpiringMap<K, V> implements Map<K, V> {
    public static final int DEFAULT_TIME_TO_LIVE = 60;
    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
    private static volatile int expirerCount = 1;
    private final ConcurrentHashMap<K, ExpiringObject> delegate;
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
    private final Expirer expirer;
}
复制代码

其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:

private class ExpiringObject {
    private K key;
    private V value;
    private long lastAccessTime;//上次访问时间
    private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
}

mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:

复制代码
private void processExpires() {
    long timeNow = System.currentTimeMillis();//当前时间
    for (ExpiringObject o : delegate.values()) {
        if (timeToLiveMillis <= 0) {
            continue;
        }
        long timeIdle = timeNow - o.getLastAccessTime();
        if (timeIdle >= timeToLiveMillis) {
            delegate.remove(o.getKey());//移除
            for (ExpirationListener<V> listener : expirationListeners) {
                listener.expired(o.getValue());//终止监听?
            }
        }
    }
}
复制代码

启动关闭该县城都需要进行封锁机制。

5、Mina中的I/O事件类型如下:

复制代码
public enum IoEventType {
    SESSION_CREATED,
    SESSION_OPENED,
    SESSION_CLOSED,
    MESSAGE_RECEIVED,
    MESSAGE_SENT,
    SESSION_IDLE,
    EXCEPTION_CAUGHT,
    WRITE,
    CLOSE,
}
复制代码

IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:

复制代码
public class IoEvent implements Runnable {
    private final IoEventType type;
    private final IoSession session;
    private final Object parameter;
    public IoEvent(IoEventType type, IoSession session, Object parameter) {    
        //...
    }
    //根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。
    public void fire() {    
        switch (getType()) {    
        case MESSAGE_RECEIVED:    
            getSession().getFilterChain().fireMessageReceived(getParameter());break;
        case MESSAGE_SENT:
            getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break;
        case WRITE:
            getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break;
        case CLOSE:
            getSession().getFilterChain().fireFilterClose();break;
        case EXCEPTION_CAUGHT:
            getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break;
        case SESSION_IDLE:
            getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break;
        case SESSION_OPENED:
            getSession().getFilterChain().fireSessionOpened();
            break;
        case SESSION_CREATED:
            getSession().getFilterChain().fireSessionCreated();
            break;
        case SESSION_CLOSED:
            getSession().getFilterChain().fireSessionClosed();
            break;
        default:
            throw new IllegalArgumentException("Unknown event type: " + getType());
        }
    }    
    public String toString() {
        if (getParameter() == null) {
            return "[" + getSession() + "] " + getType().name();
        }
        return "[" + getSession() + "] " + getType().name() + ": "+ getParameter();    
    }
}
复制代码

Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:

复制代码
public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
    public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
        return new DefaultIoSessionAttributeMap();
    }    
    public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
        return new DefaultWriteRequestQueue();
    }
    private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
        private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4));
        public DefaultIoSessionAttributeMap() {
            super();
        }        
        public Object getAttribute(IoSession session, Object key, Object defaultValue) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            Object answer = attributes.get(key);
            if (answer == null) {
                return defaultValue;
            }            
            return answer;
        }

        public Object setAttribute(IoSession session, Object key, Object value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                return attributes.remove(key);
            }            
            return attributes.put(key, value);
        }
        public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                return null;
            }

            Object oldValue;
            synchronized (attributes) {
                oldValue = attributes.get(key);
                if (oldValue == null) {
                    attributes.put(key, value);
                }
            }
            return oldValue;
        }
        public Object removeAttribute(IoSession session, Object key) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            return attributes.remove(key);
        }
        public boolean removeAttribute(IoSession session, Object key, Object value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                return false;
            }
            synchronized (attributes) {
                if (value.equals(attributes.get(key))) {
                    attributes.remove(key);
                    return true;
                }
            }
            return false;
        }
        public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
            synchronized (attributes) {
                Object actualOldValue = attributes.get(key);
                if (actualOldValue == null) {
                    return false;
                }
                if (actualOldValue.equals(oldValue)) {
                    attributes.put(key, newValue);
                    return true;
                }                
                return false;
            }
        }
        public boolean containsAttribute(IoSession session, Object key) {
            return attributes.containsKey(key);
        }
        public Set<Object> getAttributeKeys(IoSession session) {
            synchronized (attributes) {
                return new HashSet<Object>(attributes.keySet());
            }
        }
        public void dispose(IoSession session) throws Exception {}
    }    
    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
        //一个队列存储传入的写请求
        private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
        public DefaultWriteRequestQueue() {
            super();
        }
        //...
    }
}
复制代码

7、AbstractIoSession是IoSession的一个抽象实现类,如下:

private IoSessionAttributeMap attributes;//会话属性
private WriteRequestQueue writeRequestQueue;//写请求队列
private WriteRequest currentWriteRequest;//当前写请求

下面是关闭的时候涉及的成员:

复制代码
//要结束当前会话时发送写请求CLOSE_REQUEST
private static final WriteRequest CLOSE_REQUEST =
 new DefaultWriteRequest(new Object());
//在连接关闭时状态被设置为closed
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
//关闭的监听器
private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
    new IoFutureListener<CloseFuture>() {
        public void operationComplete(CloseFuture future) {
            AbstractIoSession session = (AbstractIoSession) future.getSession();
            session.scheduledWriteBytes.set(0);
            session.scheduledWriteMessages.set(0);
            session.readBytesThroughput = 0;
            session.readMessagesThroughput = 0;
            session.writtenBytesThroughput = 0;
            session.writtenMessagesThroughput = 0;
        }
    };
复制代码

close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:

复制代码
    public final CloseFuture close() {
        synchronized (lock) {
            if (isClosing()) {
                return closeFuture;
            }
            
            closing = true;
        }
        getFilterChain().fireFilterClose();//fire出关闭事件
        return closeFuture;
    }
复制代码

下面是closeOnFlush的代码:

    private final CloseFuture closeOnFlush() {
        getWriteRequestQueue().offer(this, CLOSE_REQUEST);
        getProcessor().flush(this);
        return closeFuture;
    }

对于读的情况,下面是取得读的数据队列:

复制代码
    //返回可被读取数据队列
    private Queue<ReadFuture> getReadyReadFutures() {
        Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
        //如果是第一次读取数据
        if (readyReadFutures == null) {
            //构造一个新的读数据队列
            readyReadFutures = new CircularQueue<ReadFuture>();
            Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures);
            if (oldReadyReadFutures != null) {
                readyReadFutures = oldReadyReadFutures;
            }
        }
        return readyReadFutures;
    }
复制代码

读数据的过程:

复制代码
    public final ReadFuture read() {
        //配置不允许读数据
        if (!getConfig().isUseReadOperation()) {
            throw new IllegalStateException("useReadOperation is not enabled.");
        }
        //获得已经被许可的可被读数据队列
        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        ReadFuture future;
        synchronized (readyReadFutures) {
            future = readyReadFutures.poll();
            if (future != null) {
                //如果关联的会话已关闭,通知读者
                if (future.isClosed()) {
                    readyReadFutures.offer(future);
                }
            } else {
                future = new DefaultReadFuture(this);
                //将数据出入等待读的队列
                getWaitingReadFutures().offer(future);
            }
        }
        return future;
    }
复制代码

写数据的过程:

复制代码
    //IoBuffer、文件、文件部分区域
    public WriteFuture write(Object message, SocketAddress remoteAddress) {
        if (message == null) {
            throw new NullPointerException("message");
        }
        //如果没有远端地址
        if (!getTransportMetadata().isConnectionless() && remoteAddress != null) {
            throw new UnsupportedOperationException();
        }
        //如果会话已被关闭
        if (isClosing() || !isConnected()) {
            WriteFuture future = new DefaultWriteFuture(this);
            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
            WriteException writeException = new WriteToClosedSessionException(request);
            future.setException(writeException);
            return future;
        }
        FileChannel openedFileChannel = null;
        try {            
            if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息
                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
            } else if (message instanceof FileChannel) {//文件的某一区域                
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
            } else if (message instanceof File) {//要发送的是文件
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道 
                message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
            }
        } catch (IOException e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }
        //构造写请求,通过过滤器链发送出去
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);

        //如果打开文件通道应该在完成时关闭通道
        if (openedFileChannel != null) {
            final FileChannel finalChannel = openedFileChannel;
            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }
        return writeFuture;
    }
分享到:
评论

相关推荐

    mina连接 mina心跳连接 mina断线重连

    Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...

    apache-mina-2.0.4.rar_apache mina_mina

    1. **Filter Chain**:Mina的核心设计模式之一是过滤器链。每个连接都有一系列过滤器,它们按照顺序处理入站和出站事件。过滤器可以实现特定功能,如数据编码解码、安全验证、性能监控等。 2. **Session**:Session...

    Mina入门:mina版之HelloWorld

    **Mina入门:Mina版之HelloWorld** Apache Mina是一个开源项目,它提供了一个高度模块化、高性能的网络通信框架。Mina旨在简化网络应用的开发,支持多种传输协议,如TCP、UDP、HTTP、FTP等。在这个“Mina入门:Mina...

    Mina开发之客户端

    《Mina开发之客户端详解》 Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的、高性能的网络应用框架,主要用于构建服务器端的网络应用程序。它简化了网络编程的复杂性,提供了基于事件驱动和异步I/O...

    springboot集成mina分布式

    springboot集成mina做分布式的soket服务,思路:用java线性安全的集合存储mina session到本地服务器;同时用redis等nosql缓存mina session存储服务器的ip,session的id等信息到redis上;应用服务器直接用restful等...

    Mina+Socket通信

    import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.DefaultTransportMetadata; import org.apache.mina.core.service....

    Mina客户端服务器Demo

    1. **Session**: 在Mina中,Session代表一个连接。它封装了网络连接的细节,如网络套接字,并提供了读写数据的方法。 2. **IoHandler**: IoHandler是处理网络事件的主要接口,它定义了当有数据读取、写入、连接建立...

    spring boot 整合mina 串口

    **Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...

    mina开发手册与mina完全自学手册.rar

    3. **Filter Chain**:Mina的过滤器链是其核心特性之一,它允许开发者插入自定义的过滤器来处理输入和输出数据,执行如认证、加密、压缩等操作。 4. **Protocol Codec**:编码和解码是网络通信中的重要环节,Mina...

    mina demo mina jar包

    当客户端连接建立后,Mina会为每个连接创建一个Session,用于管理与该客户端的交互。在这个"mina demo"中,我们可能看到服务端定义了一个Handler类,它处理接收到的数据,并生成相应的响应。Handler类是Mina的核心...

    mina框架实例

    Session是事件处理的核心,当有网络活动时,MINA会调用Session的相关回调方法。 4. **Protocol Buffers**:MINA提供了多种协议缓冲区,如ByteBuffer和MinaBuffer,用于高效地读写网络数据。这些缓冲区支持动态扩容...

    mina自定义编解码器详解

    **mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...

    Mina开发实例(服务端、客户端)DEMO

    当连接断开时,`sessionClosed()`方法会被触发。 在"Mina开发实例"中,项目使用了Maven构建系统。Maven帮助开发者管理依赖,使得引入和更新Mina库以及其他相关库变得简单。在`pom.xml`文件中,可以看到对Mina库的...

    Mina通信框架应用示例

    例如,创建Acceptor来监听特定端口,当有新的连接请求时,MINA会自动创建一个Session对象,并调用IoHandler的事件处理方法。 ```java public class SimpleServerHandler implements IoHandler { @Override ...

    Mina Socket 源代码

    当客户端连接建立后,Mina 将自动创建一个新的 Session 对象。 6. **Client 端实现** 在 `socket_client` 文件夹中,源代码将展示如何建立到 Mina 服务器的连接。客户端同样需要配置过滤器链和协议处理器,然后...

    Mina实现长连接和短连接实例

    在Mina中,实现长连接的关键在于编写正确的`SessionHandler`,这个处理器会监听并处理连接的各种事件,如`sessionCreated`、`messageReceived`、`sessionClosed`等。当客户端发起连接请求时,服务器端的`Acceptor`会...

    mina简单示例

    1. **Session**:Mina中的Session代表了客户端和服务端之间的连接。它封装了与网络连接相关的所有信息,如网络套接字、缓冲区和事件处理器。 2. **Filter**:过滤器是Mina的核心概念之一,它们允许你在数据传输过程...

    springboot整合mina

    SpringBoot整合Mina是一个常见的Java开发任务,它涉及到SpringBoot框架与Apache Mina网络通信库的集成。Mina是一个轻量级、高性能的网络应用框架,主要用于构建服务器端的网络应用,如TCP/IP和UDP协议的服务。而...

    Mina2.0框架源码剖析.pdf

    Mina2.0框架源码剖析 Mina2.0是一个基于Java的网络应用框架,提供了一个简洁、灵活的API,帮助开发者快速构建高性能的网络应用程序。下面是Mina2.0框架源码剖析的相关知识点: 一、Mina2.0框架概述 Mina2.0是一个...

    Mina官网例子之时间服务器

    **Mina官网例子之时间服务器** Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的网络通信框架,它为各种协议提供了低级别的基础结构,包括TCP/IP和UDP/IP。Mina的目标是简化网络编程,使其变得高效...

Global site tag (gtag.js) - Google Analytics