`
Donald_Draper
  • 浏览: 982302 次
社区版块
存档分类
最新评论

Mina 抽象Io会话

    博客分类:
  • Mina
阅读更多
Mina Io会话接口定义:http://donald-draper.iteye.com/blog/2377737
上一篇我们看到Io会话接口的定义,今天来看一下会话接口的简单实现AbstractIoSession。
/**
 * Base implementation of {@link IoSession}.
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public abstract class AbstractIoSession implements IoSession {
    /** The associated handler  会话关联的IoHandler*/
    private final IoHandler handler;
    /** The session config 会话配置*/
    protected IoSessionConfig config;
    /** The service which will manage this session 管理会话的Service*/
    private final IoService service;
    private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
            "readyReadFutures");
    private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
            "waitingReadFutures");
   //Io结果监听器,主要是重置会话读写字节和消息的计数器
    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
        public void operationComplete(CloseFuture future) {
            AbstractIoSession session = (AbstractIoSession) future.getSession();
            session.scheduledWriteBytes.set(0);
            session.scheduledWriteMessages.set(0);
            session.readBytesThroughput = 0;
            session.readMessagesThroughput = 0;
            session.writtenBytesThroughput = 0;
            session.writtenMessagesThroughput = 0;
        }
    };
    /**
     * An internal write request object that triggers session close.
     内部写请求,会话关闭时触发,即发送关闭写请求
     */
    public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
    /**
     * An internal write request object that triggers message sent events.
      内部写请求,消息发送时触发
     */
    public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
    private final Object lock = new Object();
    private IoSessionAttributeMap attributes;//会话属性Map
    private WriteRequestQueue writeRequestQueue;//写请求队列
    private WriteRequest currentWriteRequest;//当前写请求
    /** The Session creation's time 会话创建时间*/
    private final long creationTime;
    /** An id generator guaranteed to generate unique IDs for the session 会话id产生器*/
    private static AtomicLong idGenerator = new AtomicLong(0);
    /** The session ID 会话id*/
    private long sessionId;
    /**
     * A future that will be set 'closed' when the connection is closed.连接关闭结果
     */
    private final CloseFuture closeFuture = new DefaultCloseFuture(this);
    private volatile boolean closing;
    // traffic control
    private boolean readSuspended = false;//是否读暂停
    private boolean writeSuspended = false;//是否写暂停
    // Status variables
    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();//是否正在刷新写请求
    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();//已发送字节数
    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();//已发送消息数
    private long readBytes;//读取字节数
    private long writtenBytes;//写字节数
    private long readMessages;//读取消息数
    private long writtenMessages;//写消息数
    //上次读写操作发生时间
    private long lastReadTime;
    private long lastWriteTime;
    //上次吞吐量计算时间
    private long lastThroughputCalculationTime;
    //上次读写字节数及消息数
    private long lastReadBytes;
    private long lastWrittenBytes;
    private long lastReadMessages;
    private long lastWrittenMessages;
    //读写字节及消息吞吐量
    private double readBytesThroughput;
    private double writtenBytesThroughput;
    private double readMessagesThroughput;
    private double writtenMessagesThroughput;
    //空闲状态计数器
    private AtomicInteger idleCountForBoth = new AtomicInteger();
    private AtomicInteger idleCountForRead = new AtomicInteger();
    private AtomicInteger idleCountForWrite = new AtomicInteger();
    //上次空闲状态时间
    private long lastIdleTimeForBoth;
    private long lastIdleTimeForRead;
    private long lastIdleTimeForWrite;
    private boolean deferDecreaseReadBuffer = true;
}

来看构造:
 
  /**
     * Create a Session for a service
     * @param service the Service for this session
     */
    protected AbstractIoSession(IoService service) {
        this.service = service;
        this.handler = service.getHandler();
        // Initialize all the Session counters to the current time
        long currentTime = System.currentTimeMillis();
        creationTime = currentTime;
        lastThroughputCalculationTime = currentTime;
        lastReadTime = currentTime;
        lastWriteTime = currentTime;
        lastIdleTimeForBoth = currentTime;
        lastIdleTimeForRead = currentTime;
        lastIdleTimeForWrite = currentTime;
        // TODO add documentation
        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
        // Set a new ID for this session
        sessionId = idGenerator.incrementAndGet();
    }

从构造可以看出,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。
下面来看会话其他方法:
/**
     * {@inheritDoc}
     * 获取会话id
     * We use an AtomicLong to guarantee that the session ID are unique.
     */
    public final long getId() {
        return sessionId;
    }
    /**
     * @return The associated IoProcessor for this session
     获取会话关联Io处理器
     */
    public abstract IoProcessor getProcessor();
    /**
     * {@inheritDoc}
     是否连接
     */
    public final boolean isConnected() {
        return !closeFuture.isClosed();
    }
    /**
     * {@inheritDoc}
     是否激活
     */
    public boolean isActive() {
        // Return true by default
        return true;
    }
    /**
     * {@inheritDoc}
     是否关闭
     */
    public final boolean isClosing() {
        return closing || closeFuture.isClosed();
    }
    /**
     * {@inheritDoc}
     是否安全
     */
    public boolean isSecured() {
        // Always false...
        return false;
    }
    /**
     * {@inheritDoc}
     获取关闭结果
     */
    public final CloseFuture getCloseFuture() {
        return closeFuture;
    }
    /**
     * Tells if the session is scheduled for flushed
     * 判断会话是否正在被调度刷新,即处理器发送会话写请求
     * @return true if the session is scheduled for flush
     */
    public final boolean isScheduledForFlush() {
        return scheduledForFlush.get();
    }

    /**
     * Schedule the session for flushed
     刷新会话
     */
    public final void scheduledForFlush() {
        scheduledForFlush.set(true);
    }
    /**
     * Change the session's status : it's not anymore scheduled for flush
     重置会话刷新状态为否
     */
    public final void unscheduledForFlush() {
        scheduledForFlush.set(false);
    }
    /**
     * Set the scheduledForFLush flag. As we may have concurrent access to this
     * flag, we compare and set it in one call.
     * 设置会话调度舒心状态
     * @param schedule
     *            the new value to set if not already set.
     * @return true if the session flag has been set, and if it wasn't set
     *         already.
     */
    public final boolean setScheduledForFlush(boolean schedule) {
        if (schedule) {
            // If the current tag is set to false, switch it to true,
            // otherwise, we do nothing but return false : the session
            // is already scheduled for flush
            return scheduledForFlush.compareAndSet(false, schedule);
        }
        scheduledForFlush.set(schedule);
        return true;
    }
     /**
     * {@inheritDoc}关闭会话
     */
    public final CloseFuture close() {
        return closeNow();
    }
     /**
     * {@inheritDoc}
     */
    public final CloseFuture closeNow() {
        synchronized (lock) {
            if (isClosing()) {
                return closeFuture;
            }
            closing = true;
            try {
                destroy();
            } catch (Exception e) {
                IoFilterChain filterChain = getFilterChain();
                filterChain.fireExceptionCaught(e);
            }
        }
	//触发过滤过滤链fireFilterClose
        getFilterChain().fireFilterClose();
        return closeFuture;
    }
     /**
     * Destroy the session
     */
    protected void destroy() {
        if (writeRequestQueue != null) {
	   //清空写请求队列,并将写请求的结果置为已写
            while (!writeRequestQueue.isEmpty(this)) {
                WriteRequest writeRequest = writeRequestQueue.poll(this);
                
                if (writeRequest != null) {
                    WriteFuture writeFuture = writeRequest.getFuture();
                    
                    // The WriteRequest may not always have a future : The CLOSE_REQUEST
                    // and MESSAGE_SENT_REQUEST don't.
                    if (writeFuture != null) {
                        writeFuture.setWritten();
                    }
                }
            }
        }
    }
    /**
     * {@inheritDoc}
     关闭会话
     */
    public final CloseFuture close(boolean rightNow) {
        if (rightNow) {
            return closeNow();
        } else {
	    //关闭时,刷新会话请求队列
            return closeOnFlush();
        }
    }
    /**
     * {@inheritDoc}
     关闭时,刷新会话请求队列
     */
    public final CloseFuture closeOnFlush() {
        if (!isClosing()) {
            getWriteRequestQueue().offer(this, CLOSE_REQUEST);
            getProcessor().flush(this);
        }
        return closeFuture;
    }

从关闭方法来看默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,
即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。
再来看其他方法:
  
 /**
     * {@inheritDoc}获取会话关联IoHandler
     */
    public IoHandler getHandler() {
        return handler;
    }
    /**
     * {@inheritDoc}获取会话配置
     */
    public IoSessionConfig getConfig() {
        return config;
    }
    /**
     * {@inheritDoc}
     */
    public final ReadFuture read() {
        if (!getConfig().isUseReadOperation()) {
            throw new IllegalStateException("useReadOperation is not enabled.");
        }
	//获取会话读请求结果队列
        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        ReadFuture future;
        synchronized (readyReadFutures) {
            future = readyReadFutures.poll();
            if (future != null) {
	        //请求结果关闭,则添加请求到会话读请求结果队列
                if (future.isClosed()) {
                    // Let other readers get notified.
                    readyReadFutures.offer(future);
                }
            } else {
	        //新建一个默认请求结果,添加到会话等待读请求结果队列
                future = new DefaultReadFuture(this);
                getWaitingReadFutures().offer(future);
            }
        }
        return future;
    }
   
    /**
     * @return a queue of ReadFuture
     获取会话读请求结果队列
     */
    private Queue<ReadFuture> getReadyReadFutures() {
        Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
        if (readyReadFutures == null) {
            readyReadFutures = new ConcurrentLinkedQueue<>();
            Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
                    readyReadFutures);
            if (oldReadyReadFutures != null) {
                readyReadFutures = oldReadyReadFutures;
            }
        }
        return readyReadFutures;
    }

     /**
     * @return the queue of waiting ReadFuture
     */
    private Queue<ReadFuture> getWaitingReadFutures() {
        Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
        
        if (waitingReadyReadFutures == null) {
            waitingReadyReadFutures = new ConcurrentLinkedQueue<>();

            Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
                    WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
            
            if (oldWaitingReadyReadFutures != null) {
                waitingReadyReadFutures = oldWaitingReadyReadFutures;
            }
        }
        
        return waitingReadyReadFutures;
    }

从上面可以看出会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则
重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。
 
  /**
     * Associates a message to a ReadFuture
     * 关联消息到读结果
     * @param message the message to associate to the ReadFuture
     * 
     */
    public final void offerReadFuture(Object message) {
        newReadFuture().setRead(message);
    }
    /**
     * Associates a failure to a ReadFuture
     * 管理一个异常到读结果
     * @param exception the exception to associate to the ReadFuture
     */
    public final void offerFailedReadFuture(Throwable exception) {
        newReadFuture().setException(exception);
    }

    /**
     * Inform the ReadFuture that the session has been closed
     通知会话写结果已关闭
     */
    public final void offerClosedReadFuture() {
        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        
        synchronized (readyReadFutures) {
            newReadFuture().setClosed();
        }
    }

    /**
     * @return a readFuture get from the waiting ReadFuture
     从会话等待读结果队列,获取一个读结果,添加会话读请求队列
     */
    private ReadFuture newReadFuture() {
        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
        ReadFuture future;
        synchronized (readyReadFutures) {
            future = waitingReadFutures.poll();
            if (future == null) {
                future = new DefaultReadFuture(this);
                readyReadFutures.offer(future);
            }
        }
        return future;
    }

再来看写操作:
 /**
     * {@inheritDoc}
     */
    public WriteFuture write(Object message) {
        return write(message, null);
    }
    /**
     * {@inheritDoc}
     */
    public WriteFuture write(Object message, SocketAddress remoteAddress) {
        if (message == null) {//消息为空
            throw new IllegalArgumentException("Trying to write a null message : not allowed");
        }
        // We can't send a message to a connected session if we don't have
        // the remote address  
        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
            throw new UnsupportedOperationException();
        }

        // If the session has been closed or is closing, we can't either
        // send a message to the remote side. We generate a future
        // containing an exception. 会话已关闭
        if (isClosing() || !isConnected()) {
            WriteFuture future = new DefaultWriteFuture(this);
            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
            WriteException writeException = new WriteToClosedSessionException(request);
            future.setException(writeException);
            return future;
        }

        FileChannel openedFileChannel = null;

        // TODO: remove this code as soon as we use InputStream
        // instead of Object for the message.
        try {
            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
	       //如果buffer没有数据
                // Nothing to write : probably an error in the user code
                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
            } else if (message instanceof FileChannel) {
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
            } else if (message instanceof File) {
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();
                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
            }
        } catch (IOException e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }

        // Now, we can write the message. First, create a future
	//构建写请求及写请求结果
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);

        // Then, get the chain and inject the WriteRequest into it
	//触发会话过滤链fireFilterWrite事件
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);

        // TODO : This is not our business ! The caller has created a
        // FileChannel,
        // he has to close it !
        if (openedFileChannel != null) {
            // If we opened a FileChannel, it needs to be closed when the write
            // has completed
            final FileChannel finalChannel = openedFileChannel;
	    //在消息发送完,关闭文件通道
            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }
        // Return the WriteFuture.
        return writeFuture;
    }

从上面可以看出,会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
再来会话附加物和属性设置相关的方法:
这些没有什么好讲的看一下就行
 
  /**
     * {@inheritDoc}
     */
    public final Object getAttachment() {
        return getAttribute("");
    }

    /**
     * {@inheritDoc}
     */
    public final Object setAttachment(Object attachment) {
        return setAttribute("", attachment);
    }

    /**
     * {@inheritDoc}
     */
    public final Object getAttribute(Object key) {
        return getAttribute(key, null);
    }

    /**
     * {@inheritDoc}
     */
    public final Object getAttribute(Object key, Object defaultValue) {
        return attributes.getAttribute(this, key, defaultValue);
    }

    /**
     * {@inheritDoc}
     */
    public final Object setAttribute(Object key, Object value) {
        return attributes.setAttribute(this, key, value);
    }

    /**
     * {@inheritDoc}
     */
    public final Object setAttribute(Object key) {
        return setAttribute(key, Boolean.TRUE);
    }

    /**
     * {@inheritDoc}
     */
    public final Object setAttributeIfAbsent(Object key, Object value) {
        return attributes.setAttributeIfAbsent(this, key, value);
    }

    /**
     * {@inheritDoc}
     */
    public final Object setAttributeIfAbsent(Object key) {
        return setAttributeIfAbsent(key, Boolean.TRUE);
    }

    /**
     * {@inheritDoc}
     */
    public final Object removeAttribute(Object key) {
        return attributes.removeAttribute(this, key);
    }

    /**
     * {@inheritDoc}
     */
    public final boolean removeAttribute(Object key, Object value) {
        return attributes.removeAttribute(this, key, value);
    }

    /**
     * {@inheritDoc}
     */
    public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
        return attributes.replaceAttribute(this, key, oldValue, newValue);
    }

    /**
     * {@inheritDoc}
     */
    public final boolean containsAttribute(Object key) {
        return attributes.containsAttribute(this, key);
    }

    /**
     * {@inheritDoc}
     */
    public final Set<Object> getAttributeKeys() {
        return attributes.getAttributeKeys(this);
    }

从上面来看会话属性主要是通过IoSessionAttributeMap来完成。
  
 /**
     * @return The map of attributes associated with the session
     */
    public final IoSessionAttributeMap getAttributeMap() {
        return attributes;
    }

    /**
     * Set the map of attributes associated with the session
     * 
     * @param attributes The Map of attributes
     */
    public final void setAttributeMap(IoSessionAttributeMap attributes) {
        this.attributes = attributes;
    }

    /**
     * Create a new close aware write queue, based on the given write queue.
     * 
     * @param writeRequestQueue The write request queue
     */
    public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
        this.writeRequestQueue = writeRequestQueue;
    }
    /**
     * {@inheritDoc}
     */
    public final void suspendRead() {
        readSuspended = true;
        if (isClosing() || !isConnected()) {
            return;
        }
	//更新会话在IO处理器中的状态
        getProcessor().updateTrafficControl(this);
    }
    /**
     * {@inheritDoc}
     */
    public final void suspendWrite() {
        writeSuspended = true;
        if (isClosing() || !isConnected()) {
            return;
        }
	//更新会话在IO处理器中的状态
        getProcessor().updateTrafficControl(this);
    }

    /**
     * {@inheritDoc}
     */
    @SuppressWarnings("unchecked")
    public final void resumeRead() {
        readSuspended = false;
        if (isClosing() || !isConnected()) {
            return;
        }
	//更新会话在IO处理器中的状态
        getProcessor().updateTrafficControl(this);
    }

    /**
     * {@inheritDoc}
     */
    @SuppressWarnings("unchecked")
    public final void resumeWrite() {
        writeSuspended = false;
        if (isClosing() || !isConnected()) {
            return;
        }
	//更新会话在IO处理器中的状态
        getProcessor().updateTrafficControl(this);
    }

    /**
     * {@inheritDoc}
     */
    public boolean isReadSuspended() {
        return readSuspended;
    }

    /**
     * {@inheritDoc}
     */
    public boolean isWriteSuspended() {
        return writeSuspended;
    }

下面是一些获取读写字节数和消息数的方法
  
 /**
     * {@inheritDoc}
     */
    public final long getReadBytes() {
        return readBytes;
    }

    /**
     * {@inheritDoc}
     */
    public final long getWrittenBytes() {
        return writtenBytes;
    }

    /**
     * {@inheritDoc}
     */
    public final long getReadMessages() {
        return readMessages;
    }

    /**
     * {@inheritDoc}
     */
    public final long getWrittenMessages() {
        return writtenMessages;
    }

    /**
     * {@inheritDoc}
     */
    public final double getReadBytesThroughput() {
        return readBytesThroughput;
    }

    /**
     * {@inheritDoc}
     */
    public final double getWrittenBytesThroughput() {
        return writtenBytesThroughput;
    }

    /**
     * {@inheritDoc}
     */
    public final double getReadMessagesThroughput() {
        return readMessagesThroughput;
    }

    /**
     * {@inheritDoc}
     */
    public final double getWrittenMessagesThroughput() {
        return writtenMessagesThroughput;
    }
  /**
     * {@inheritDoc}
     更新吞吐量
     */
    public final void updateThroughput(long currentTime, boolean force) {
        int interval = (int) (currentTime - lastThroughputCalculationTime);
       //获取吞吐量计算间隔
        long minInterval = getConfig().getThroughputCalculationIntervalInMillis();

        if (((minInterval == 0) || (interval < minInterval)) && !force) {
            return;
        }
        //计算读写字节及消息吞吐量
        readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
        writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
        readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
        writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
        //更新上次读写字节及消息吞吐量
        lastReadBytes = readBytes;
        lastWrittenBytes = writtenBytes;
        lastReadMessages = readMessages;
        lastWrittenMessages = writtenMessages;
        //记录上次更新吞吐量时间
        lastThroughputCalculationTime = currentTime;
    }

设置与获取调度读写字节数及消息数
  
 /**
     * {@inheritDoc}
     */
    public final long getScheduledWriteBytes() {
        return scheduledWriteBytes.get();
    }

    /**
     * {@inheritDoc}
     */
    public final int getScheduledWriteMessages() {
        return scheduledWriteMessages.get();
    }

    /**
     * Set the number of scheduled write bytes
     * 
     * @param byteCount The number of scheduled bytes for write
     */
    protected void setScheduledWriteBytes(int byteCount) {
        scheduledWriteBytes.set(byteCount);
    }

    /**
     * Set the number of scheduled write messages
     * 
     * @param messages The number of scheduled messages for write
     */
    protected void setScheduledWriteMessages(int messages) {
        scheduledWriteMessages.set(messages);
    }

    /**
     * Increase the number of read bytes
     * 增加读取字节数
     * @param increment The number of read bytes
     * @param currentTime The current time
     */
    public final void increaseReadBytes(long increment, long currentTime) {
        if (increment <= 0) {
            return;
        }
        readBytes += increment;
        lastReadTime = currentTime;
	//更新空心读,读写空闲次数为0
        idleCountForBoth.set(0);
        idleCountForRead.set(0);
        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
        }
    }

    /**
     * Increase the number of read messages
     * 增加读取消息数
     * @param currentTime The current time
     */
    public final void increaseReadMessages(long currentTime) {
        readMessages++;
        lastReadTime = currentTime;
        idleCountForBoth.set(0);
        idleCountForRead.set(0);

        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
        }
    }

    /**
     * Increase the number of written bytes
     * 增加写字节数
     * @param increment The number of written bytes
     * @param currentTime The current time
     */
    public final void increaseWrittenBytes(int increment, long currentTime) {
        if (increment <= 0) {
            return;
        }

        writtenBytes += increment;
        lastWriteTime = currentTime;
        idleCountForBoth.set(0);
        idleCountForWrite.set(0);

        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
        }

        increaseScheduledWriteBytes(-increment);
    }

    /**
     * Increase the number of written messages
     * 增加写消息数
     * @param request The written message
     * @param currentTime The current tile
     */
    public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
        Object message = request.getMessage();
        if (message instanceof IoBuffer) {
            IoBuffer b = (IoBuffer) message;

            if (b.hasRemaining()) {
                return;
            }
        }
        writtenMessages++;
        lastWriteTime = currentTime;

        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
        }
        decreaseScheduledWriteMessages();
    }

    /**
     * Increase the number of scheduled write bytes for the session
     * 增加会话调度写字节数
     * @param increment The number of newly added bytes to write
     */
    public final void increaseScheduledWriteBytes(int increment) {
        scheduledWriteBytes.addAndGet(increment);
        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
        }
    }

    /**
     * Increase the number of scheduled message to write
     增加会话调度写消息数
     */
    public final void increaseScheduledWriteMessages() {
        scheduledWriteMessages.incrementAndGet();
        
        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
        }
    }

    /**
     * Decrease the number of scheduled message written
     */
    private void decreaseScheduledWriteMessages() {
        scheduledWriteMessages.decrementAndGet();
        if (getService() instanceof AbstractIoService) {
            ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
        }
    }

    /**
     * Decrease the counters of written messages and written bytes when a message has been written
     * 当消息发送时,减少写字节数与消息数计数器
     * @param request The written message
     */
    public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
        Object message = request.getMessage();
        if (message instanceof IoBuffer) {
            IoBuffer b = (IoBuffer) message;
            
            if (b.hasRemaining()) {
                increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
            } else {
                decreaseScheduledWriteMessages();
            }
        } else {
            decreaseScheduledWriteMessages();
        }
    }

    /**
     * {@inheritDoc}
     获取会话写请求队列
     */
    public final WriteRequestQueue getWriteRequestQueue() {
        if (writeRequestQueue == null) {
            throw new IllegalStateException();
        }
        
        return writeRequestQueue;
    }

    /**
     * {@inheritDoc}
     获取会话当前写请求
     */
    public final WriteRequest getCurrentWriteRequest() {
        return currentWriteRequest;
    }

    /**
     * {@inheritDoc}
     */
    public final Object getCurrentWriteMessage() {
        WriteRequest req = getCurrentWriteRequest();
        
        if (req == null) {
            return null;
        }
        return req.getMessage();
    }

    /**
     * {@inheritDoc}
     */
    public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
        this.currentWriteRequest = currentWriteRequest;
    }

    /**
     * Increase the ReadBuffer size (it will double)
     增加读缓存区大小为原来的两倍
     */
    public final void increaseReadBufferSize() {
        int newReadBufferSize = getConfig().getReadBufferSize() << 1;
        if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
            getConfig().setReadBufferSize(newReadBufferSize);
        } else {
            getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
        }

        deferDecreaseReadBuffer = true;
    }

    /**
     * Decrease the ReadBuffer size (it will be divided by a factor 2)
      减少读缓存区大小为原来的1/2
     */
    public final void decreaseReadBufferSize() {
        if (deferDecreaseReadBuffer) {
            deferDecreaseReadBuffer = false;
            return;
        }

        if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
            getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
        }

        deferDecreaseReadBuffer = true;
    }

    /**
     * {@inheritDoc}
     会话创建时间
     */
    public final long getCreationTime() {
        return creationTime;
    }

    /**
     * {@inheritDoc}
     上次Io时间
     */
    public final long getLastIoTime() {
        return Math.max(lastReadTime, lastWriteTime);
    }

    /**
     * {@inheritDoc}
     上次读操作时间
     */
    public final long getLastReadTime() {
        return lastReadTime;
    }

    /**
     * {@inheritDoc}
     上次写操作时间
     */
    public final long getLastWriteTime() {
        return lastWriteTime;
    }

再看读写空闲计数器更新:
是否处于空闲状态status(读空闲,写空闲,读写空闲)
/**
     * {@inheritDoc}
     */
    public final boolean isIdle(IdleStatus status) {
        if (status == IdleStatus.BOTH_IDLE) {
            return idleCountForBoth.get() > 0;
        }

        if (status == IdleStatus.READER_IDLE) {
            return idleCountForRead.get() > 0;
        }

        if (status == IdleStatus.WRITER_IDLE) {
            return idleCountForWrite.get() > 0;
        }

        throw new IllegalArgumentException("Unknown idle status: " + status);
    }

    /**
     * {@inheritDoc}
     */
    public final boolean isBothIdle() {
        return isIdle(IdleStatus.BOTH_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final boolean isReaderIdle() {
        return isIdle(IdleStatus.READER_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final boolean isWriterIdle() {
        return isIdle(IdleStatus.WRITER_IDLE);
    }

    /**
     * {@inheritDoc}
     获取会话空闲状态次数
     */
    public final int getIdleCount(IdleStatus status) {
        if (getConfig().getIdleTime(status) == 0) {
            if (status == IdleStatus.BOTH_IDLE) {
                idleCountForBoth.set(0);
            }

            if (status == IdleStatus.READER_IDLE) {
                idleCountForRead.set(0);
            }

            if (status == IdleStatus.WRITER_IDLE) {
                idleCountForWrite.set(0);
            }
        }

        if (status == IdleStatus.BOTH_IDLE) {
            return idleCountForBoth.get();
        }

        if (status == IdleStatus.READER_IDLE) {
            return idleCountForRead.get();
        }

        if (status == IdleStatus.WRITER_IDLE) {
            return idleCountForWrite.get();
        }

        throw new IllegalArgumentException("Unknown idle status: " + status);
    }

    /**
     * {@inheritDoc}
     上次空闲状态时间
     */
    public final long getLastIdleTime(IdleStatus status) {
        if (status == IdleStatus.BOTH_IDLE) {
            return lastIdleTimeForBoth;
        }

        if (status == IdleStatus.READER_IDLE) {
            return lastIdleTimeForRead;
        }

        if (status == IdleStatus.WRITER_IDLE) {
            return lastIdleTimeForWrite;
        }

        throw new IllegalArgumentException("Unknown idle status: " + status);
    }

    /**
     * Increase the count of the various Idle counter
     * 更新空闲状态计数器
     * @param status The current status
     * @param currentTime The current time
     */
    public final void increaseIdleCount(IdleStatus status, long currentTime) {
        if (status == IdleStatus.BOTH_IDLE) {
            idleCountForBoth.incrementAndGet();
            lastIdleTimeForBoth = currentTime;
        } else if (status == IdleStatus.READER_IDLE) {
            idleCountForRead.incrementAndGet();
            lastIdleTimeForRead = currentTime;
        } else if (status == IdleStatus.WRITER_IDLE) {
            idleCountForWrite.incrementAndGet();
            lastIdleTimeForWrite = currentTime;
        } else {
            throw new IllegalArgumentException("Unknown idle status: " + status);
        }
    }

    /**
     * {@inheritDoc}
     */
    public final int getBothIdleCount() {
        return getIdleCount(IdleStatus.BOTH_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final long getLastBothIdleTime() {
        return getLastIdleTime(IdleStatus.BOTH_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final long getLastReaderIdleTime() {
        return getLastIdleTime(IdleStatus.READER_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final long getLastWriterIdleTime() {
        return getLastIdleTime(IdleStatus.WRITER_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final int getReaderIdleCount() {
        return getIdleCount(IdleStatus.READER_IDLE);
    }

    /**
     * {@inheritDoc}
     */
    public final int getWriterIdleCount() {
        return getIdleCount(IdleStatus.WRITER_IDLE);
    }

    /**
     * {@inheritDoc}
     获取会话远端地址,如果Service为IoAcceptor,为IoAcceptor监听地址,
     否则为会话远端socket地址
     */
    public SocketAddress getServiceAddress() {
        IoService service = getService();
        if (service instanceof IoAcceptor) {
            return ((IoAcceptor) service).getLocalAddress();
        }

        return getRemoteAddress();
    }

    /**
     * Get the Id as a String
     获取会话id
     */
    private String getIdAsString() {
        String id = Long.toHexString(getId()).toUpperCase();
        
        if (id.length() <= 8) {
            return "0x00000000".substring(0, 10 - id.length()) + id;
        } else {
            return "0x" + id;
        }
    }

    /**
     * TGet the Service name
     获取service名
     */
    private String getServiceName() {
        TransportMetadata tm = getTransportMetadata();
        if (tm == null) {
            return "null";
        }

        return tm.getProviderName() + ' ' + tm.getName();
    }

    /**
     * {@inheritDoc}
     获取会话关联IoService
     */
    public IoService getService() {
        return service;
    }
     /**
     * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
     * in the specified collection.
     * 
     * @param sessions The sessions that are notified
     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
     */
    public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
        while (sessions.hasNext()) {
            IoSession session = sessions.next();
            
            if (!session.getCloseFuture().isClosed()) {
                notifyIdleSession(session, currentTime);
            }
        }
    }

    /**
     * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
     * specified {@code session}.
     * 通知会话空闲
     * @param session The session that is notified
     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
     */
    public static void notifyIdleSession(IoSession session, long currentTime) {
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));

        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
                IdleStatus.READER_IDLE,
                Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));

        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
                IdleStatus.WRITER_IDLE,
                Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));

        notifyWriteTimeout(session, currentTime);
    }
   //触发会话过滤链会话空闲事件fireSessionIdle
    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
            long lastIoTime) {
        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
            session.getFilterChain().fireSessionIdle(status);
        }
    }
   //通知会话写超时
    private static void notifyWriteTimeout(IoSession session, long currentTime) {

        long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
        if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
                && !session.getWriteRequestQueue().isEmpty(session)) {
            WriteRequest request = session.getCurrentWriteRequest();
            if (request != null) {
                session.setCurrentWriteRequest(null);
                WriteTimeoutException cause = new WriteTimeoutException(request);
                request.getFuture().setException(cause);
                session.getFilterChain().fireExceptionCaught(cause);
                // WriteException is an IOException, so we close the session.
		//会话写超时,关闭会话
                session.closeNow();
            }
        }
    }
}

总结:
抽象会话AbstractIoSession内部有一个关联的IoService和一个IoHandler;一个写请求队列用于存发会话写请求;一个会话属性Map存放会话属性,还有一些读写字节数,消息数,相关吞吐量和上次读写或空闲操作时间计数器。,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。关闭方法默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。

附:
//IoFutureListener
/**
 * Something interested in being notified when the result
 * of an {@link IoFuture} becomes available.
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
   public interface IoFutureListener extends EventListener {
    /**
     * An {@link IoFutureListener} that closes the {@link IoSession} which is
     * associated with the specified {@link IoFuture}.
     */
    static IoFutureListener CLOSE = new IoFutureListener() {
        public void operationComplete(IoFuture future) {
            future.getSession().close();
        }
    };
    /**
     * Invoked when the operation associated with the {@link IoFuture}
     * has been completed.
     * @param future  The source {@link IoFuture} which called this
     *                callback.
     */
    void operationComplete(IoFuture future);
}

//EventListener
/**
 * A tagging interface that all event listener interfaces must extend.
 * @since JDK1.1
 */
public interface EventListener {
}

//DefaultWriteRequest
public class DefaultWriteRequest
    implements WriteRequest
{
    private final Object message;
    private final WriteFuture future;
    private final SocketAddress destination;
    ...
}

//IoSessionAttributeMap
public interface IoSessionAttributeMap
{
    public abstract Object getAttribute(IoSession iosession, Object obj, Object obj1);
    public abstract Object setAttribute(IoSession iosession, Object obj, Object obj1);
    public abstract Object setAttributeIfAbsent(IoSession iosession, Object obj, Object obj1);
    public abstract Object removeAttribute(IoSession iosession, Object obj);
    public abstract boolean removeAttribute(IoSession iosession, Object obj, Object obj1);
    public abstract boolean replaceAttribute(IoSession iosession, Object obj, Object obj1, Object obj2);
    public abstract boolean containsAttribute(IoSession iosession, Object obj);
    public abstract Set getAttributeKeys(IoSession iosession);
    public abstract void dispose(IoSession iosession)
        throws Exception;
}
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics