- 浏览: 987830 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Mina Io会话接口定义:http://donald-draper.iteye.com/blog/2377737
上一篇我们看到Io会话接口的定义,今天来看一下会话接口的简单实现AbstractIoSession。
来看构造:
从构造可以看出,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。
下面来看会话其他方法:
从关闭方法来看默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,
即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。
再来看其他方法:
从上面可以看出会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则
重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。
再来看写操作:
从上面可以看出,会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
再来会话附加物和属性设置相关的方法:
这些没有什么好讲的看一下就行
从上面来看会话属性主要是通过IoSessionAttributeMap来完成。
下面是一些获取读写字节数和消息数的方法
设置与获取调度读写字节数及消息数
再看读写空闲计数器更新:
是否处于空闲状态status(读空闲,写空闲,读写空闲)
总结:
抽象会话AbstractIoSession内部有一个关联的IoService和一个IoHandler;一个写请求队列用于存发会话写请求;一个会话属性Map存放会话属性,还有一些读写字节数,消息数,相关吞吐量和上次读写或空闲操作时间计数器。,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。关闭方法默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
附:
//IoFutureListener
//EventListener
//DefaultWriteRequest
//IoSessionAttributeMap
上一篇我们看到Io会话接口的定义,今天来看一下会话接口的简单实现AbstractIoSession。
/** * Base implementation of {@link IoSession}. * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public abstract class AbstractIoSession implements IoSession { /** The associated handler 会话关联的IoHandler*/ private final IoHandler handler; /** The session config 会话配置*/ protected IoSessionConfig config; /** The service which will manage this session 管理会话的Service*/ private final IoService service; private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, "readyReadFutures"); private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, "waitingReadFutures"); //Io结果监听器,主要是重置会话读写字节和消息的计数器 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { public void operationComplete(CloseFuture future) { AbstractIoSession session = (AbstractIoSession) future.getSession(); session.scheduledWriteBytes.set(0); session.scheduledWriteMessages.set(0); session.readBytesThroughput = 0; session.readMessagesThroughput = 0; session.writtenBytesThroughput = 0; session.writtenMessagesThroughput = 0; } }; /** * An internal write request object that triggers session close. 内部写请求,会话关闭时触发,即发送关闭写请求 */ public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); /** * An internal write request object that triggers message sent events. 内部写请求,消息发送时触发 */ public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE); private final Object lock = new Object(); private IoSessionAttributeMap attributes;//会话属性Map private WriteRequestQueue writeRequestQueue;//写请求队列 private WriteRequest currentWriteRequest;//当前写请求 /** The Session creation's time 会话创建时间*/ private final long creationTime; /** An id generator guaranteed to generate unique IDs for the session 会话id产生器*/ private static AtomicLong idGenerator = new AtomicLong(0); /** The session ID 会话id*/ private long sessionId; /** * A future that will be set 'closed' when the connection is closed.连接关闭结果 */ private final CloseFuture closeFuture = new DefaultCloseFuture(this); private volatile boolean closing; // traffic control private boolean readSuspended = false;//是否读暂停 private boolean writeSuspended = false;//是否写暂停 // Status variables private final AtomicBoolean scheduledForFlush = new AtomicBoolean();//是否正在刷新写请求 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();//已发送字节数 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();//已发送消息数 private long readBytes;//读取字节数 private long writtenBytes;//写字节数 private long readMessages;//读取消息数 private long writtenMessages;//写消息数 //上次读写操作发生时间 private long lastReadTime; private long lastWriteTime; //上次吞吐量计算时间 private long lastThroughputCalculationTime; //上次读写字节数及消息数 private long lastReadBytes; private long lastWrittenBytes; private long lastReadMessages; private long lastWrittenMessages; //读写字节及消息吞吐量 private double readBytesThroughput; private double writtenBytesThroughput; private double readMessagesThroughput; private double writtenMessagesThroughput; //空闲状态计数器 private AtomicInteger idleCountForBoth = new AtomicInteger(); private AtomicInteger idleCountForRead = new AtomicInteger(); private AtomicInteger idleCountForWrite = new AtomicInteger(); //上次空闲状态时间 private long lastIdleTimeForBoth; private long lastIdleTimeForRead; private long lastIdleTimeForWrite; private boolean deferDecreaseReadBuffer = true; }
来看构造:
/** * Create a Session for a service * @param service the Service for this session */ protected AbstractIoSession(IoService service) { this.service = service; this.handler = service.getHandler(); // Initialize all the Session counters to the current time long currentTime = System.currentTimeMillis(); creationTime = currentTime; lastThroughputCalculationTime = currentTime; lastReadTime = currentTime; lastWriteTime = currentTime; lastIdleTimeForBoth = currentTime; lastIdleTimeForRead = currentTime; lastIdleTimeForWrite = currentTime; // TODO add documentation closeFuture.addListener(SCHEDULED_COUNTER_RESETTER); // Set a new ID for this session sessionId = idGenerator.incrementAndGet(); }
从构造可以看出,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。
下面来看会话其他方法:
/** * {@inheritDoc} * 获取会话id * We use an AtomicLong to guarantee that the session ID are unique. */ public final long getId() { return sessionId; } /** * @return The associated IoProcessor for this session 获取会话关联Io处理器 */ public abstract IoProcessor getProcessor(); /** * {@inheritDoc} 是否连接 */ public final boolean isConnected() { return !closeFuture.isClosed(); } /** * {@inheritDoc} 是否激活 */ public boolean isActive() { // Return true by default return true; } /** * {@inheritDoc} 是否关闭 */ public final boolean isClosing() { return closing || closeFuture.isClosed(); } /** * {@inheritDoc} 是否安全 */ public boolean isSecured() { // Always false... return false; } /** * {@inheritDoc} 获取关闭结果 */ public final CloseFuture getCloseFuture() { return closeFuture; } /** * Tells if the session is scheduled for flushed * 判断会话是否正在被调度刷新,即处理器发送会话写请求 * @return true if the session is scheduled for flush */ public final boolean isScheduledForFlush() { return scheduledForFlush.get(); } /** * Schedule the session for flushed 刷新会话 */ public final void scheduledForFlush() { scheduledForFlush.set(true); } /** * Change the session's status : it's not anymore scheduled for flush 重置会话刷新状态为否 */ public final void unscheduledForFlush() { scheduledForFlush.set(false); } /** * Set the scheduledForFLush flag. As we may have concurrent access to this * flag, we compare and set it in one call. * 设置会话调度舒心状态 * @param schedule * the new value to set if not already set. * @return true if the session flag has been set, and if it wasn't set * already. */ public final boolean setScheduledForFlush(boolean schedule) { if (schedule) { // If the current tag is set to false, switch it to true, // otherwise, we do nothing but return false : the session // is already scheduled for flush return scheduledForFlush.compareAndSet(false, schedule); } scheduledForFlush.set(schedule); return true; } /** * {@inheritDoc}关闭会话 */ public final CloseFuture close() { return closeNow(); } /** * {@inheritDoc} */ public final CloseFuture closeNow() { synchronized (lock) { if (isClosing()) { return closeFuture; } closing = true; try { destroy(); } catch (Exception e) { IoFilterChain filterChain = getFilterChain(); filterChain.fireExceptionCaught(e); } } //触发过滤过滤链fireFilterClose getFilterChain().fireFilterClose(); return closeFuture; } /** * Destroy the session */ protected void destroy() { if (writeRequestQueue != null) { //清空写请求队列,并将写请求的结果置为已写 while (!writeRequestQueue.isEmpty(this)) { WriteRequest writeRequest = writeRequestQueue.poll(this); if (writeRequest != null) { WriteFuture writeFuture = writeRequest.getFuture(); // The WriteRequest may not always have a future : The CLOSE_REQUEST // and MESSAGE_SENT_REQUEST don't. if (writeFuture != null) { writeFuture.setWritten(); } } } } } /** * {@inheritDoc} 关闭会话 */ public final CloseFuture close(boolean rightNow) { if (rightNow) { return closeNow(); } else { //关闭时,刷新会话请求队列 return closeOnFlush(); } } /** * {@inheritDoc} 关闭时,刷新会话请求队列 */ public final CloseFuture closeOnFlush() { if (!isClosing()) { getWriteRequestQueue().offer(this, CLOSE_REQUEST); getProcessor().flush(this); } return closeFuture; }
从关闭方法来看默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,
即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。
再来看其他方法:
/** * {@inheritDoc}获取会话关联IoHandler */ public IoHandler getHandler() { return handler; } /** * {@inheritDoc}获取会话配置 */ public IoSessionConfig getConfig() { return config; } /** * {@inheritDoc} */ public final ReadFuture read() { if (!getConfig().isUseReadOperation()) { throw new IllegalStateException("useReadOperation is not enabled."); } //获取会话读请求结果队列 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = readyReadFutures.poll(); if (future != null) { //请求结果关闭,则添加请求到会话读请求结果队列 if (future.isClosed()) { // Let other readers get notified. readyReadFutures.offer(future); } } else { //新建一个默认请求结果,添加到会话等待读请求结果队列 future = new DefaultReadFuture(this); getWaitingReadFutures().offer(future); } } return future; } /** * @return a queue of ReadFuture 获取会话读请求结果队列 */ private Queue<ReadFuture> getReadyReadFutures() { Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); if (readyReadFutures == null) { readyReadFutures = new ConcurrentLinkedQueue<>(); Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures); if (oldReadyReadFutures != null) { readyReadFutures = oldReadyReadFutures; } } return readyReadFutures; } /** * @return the queue of waiting ReadFuture */ private Queue<ReadFuture> getWaitingReadFutures() { Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY); if (waitingReadyReadFutures == null) { waitingReadyReadFutures = new ConcurrentLinkedQueue<>(); Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent( WAITING_READ_FUTURES_KEY, waitingReadyReadFutures); if (oldWaitingReadyReadFutures != null) { waitingReadyReadFutures = oldWaitingReadyReadFutures; } } return waitingReadyReadFutures; }
从上面可以看出会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则
重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。
/** * Associates a message to a ReadFuture * 关联消息到读结果 * @param message the message to associate to the ReadFuture * */ public final void offerReadFuture(Object message) { newReadFuture().setRead(message); } /** * Associates a failure to a ReadFuture * 管理一个异常到读结果 * @param exception the exception to associate to the ReadFuture */ public final void offerFailedReadFuture(Throwable exception) { newReadFuture().setException(exception); } /** * Inform the ReadFuture that the session has been closed 通知会话写结果已关闭 */ public final void offerClosedReadFuture() { Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); synchronized (readyReadFutures) { newReadFuture().setClosed(); } } /** * @return a readFuture get from the waiting ReadFuture 从会话等待读结果队列,获取一个读结果,添加会话读请求队列 */ private ReadFuture newReadFuture() { Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures(); ReadFuture future; synchronized (readyReadFutures) { future = waitingReadFutures.poll(); if (future == null) { future = new DefaultReadFuture(this); readyReadFutures.offer(future); } } return future; }
再来看写操作:
/** * {@inheritDoc} */ public WriteFuture write(Object message) { return write(message, null); } /** * {@inheritDoc} */ public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) {//消息为空 throw new IllegalArgumentException("Trying to write a null message : not allowed"); } // We can't send a message to a connected session if we don't have // the remote address if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) { throw new UnsupportedOperationException(); } // If the session has been closed or is closing, we can't either // send a message to the remote side. We generate a future // containing an exception. 会话已关闭 if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; // TODO: remove this code as soon as we use InputStream // instead of Object for the message. try { if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { //如果buffer没有数据 // Nothing to write : probably an error in the user code throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) { FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) { File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel(); message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } // Now, we can write the message. First, create a future //构建写请求及写请求结果 WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it //触发会话过滤链fireFilterWrite事件 IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); // TODO : This is not our business ! The caller has created a // FileChannel, // he has to close it ! if (openedFileChannel != null) { // If we opened a FileChannel, it needs to be closed when the write // has completed final FileChannel finalChannel = openedFileChannel; //在消息发送完,关闭文件通道 writeFuture.addListener(new IoFutureListener<WriteFuture>() { public void operationComplete(WriteFuture future) { try { finalChannel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }); } // Return the WriteFuture. return writeFuture; }
从上面可以看出,会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
再来会话附加物和属性设置相关的方法:
这些没有什么好讲的看一下就行
/** * {@inheritDoc} */ public final Object getAttachment() { return getAttribute(""); } /** * {@inheritDoc} */ public final Object setAttachment(Object attachment) { return setAttribute("", attachment); } /** * {@inheritDoc} */ public final Object getAttribute(Object key) { return getAttribute(key, null); } /** * {@inheritDoc} */ public final Object getAttribute(Object key, Object defaultValue) { return attributes.getAttribute(this, key, defaultValue); } /** * {@inheritDoc} */ public final Object setAttribute(Object key, Object value) { return attributes.setAttribute(this, key, value); } /** * {@inheritDoc} */ public final Object setAttribute(Object key) { return setAttribute(key, Boolean.TRUE); } /** * {@inheritDoc} */ public final Object setAttributeIfAbsent(Object key, Object value) { return attributes.setAttributeIfAbsent(this, key, value); } /** * {@inheritDoc} */ public final Object setAttributeIfAbsent(Object key) { return setAttributeIfAbsent(key, Boolean.TRUE); } /** * {@inheritDoc} */ public final Object removeAttribute(Object key) { return attributes.removeAttribute(this, key); } /** * {@inheritDoc} */ public final boolean removeAttribute(Object key, Object value) { return attributes.removeAttribute(this, key, value); } /** * {@inheritDoc} */ public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) { return attributes.replaceAttribute(this, key, oldValue, newValue); } /** * {@inheritDoc} */ public final boolean containsAttribute(Object key) { return attributes.containsAttribute(this, key); } /** * {@inheritDoc} */ public final Set<Object> getAttributeKeys() { return attributes.getAttributeKeys(this); }
从上面来看会话属性主要是通过IoSessionAttributeMap来完成。
/** * @return The map of attributes associated with the session */ public final IoSessionAttributeMap getAttributeMap() { return attributes; } /** * Set the map of attributes associated with the session * * @param attributes The Map of attributes */ public final void setAttributeMap(IoSessionAttributeMap attributes) { this.attributes = attributes; } /** * Create a new close aware write queue, based on the given write queue. * * @param writeRequestQueue The write request queue */ public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) { this.writeRequestQueue = writeRequestQueue; } /** * {@inheritDoc} */ public final void suspendRead() { readSuspended = true; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ public final void suspendWrite() { writeSuspended = true; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ @SuppressWarnings("unchecked") public final void resumeRead() { readSuspended = false; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ @SuppressWarnings("unchecked") public final void resumeWrite() { writeSuspended = false; if (isClosing() || !isConnected()) { return; } //更新会话在IO处理器中的状态 getProcessor().updateTrafficControl(this); } /** * {@inheritDoc} */ public boolean isReadSuspended() { return readSuspended; } /** * {@inheritDoc} */ public boolean isWriteSuspended() { return writeSuspended; }
下面是一些获取读写字节数和消息数的方法
/** * {@inheritDoc} */ public final long getReadBytes() { return readBytes; } /** * {@inheritDoc} */ public final long getWrittenBytes() { return writtenBytes; } /** * {@inheritDoc} */ public final long getReadMessages() { return readMessages; } /** * {@inheritDoc} */ public final long getWrittenMessages() { return writtenMessages; } /** * {@inheritDoc} */ public final double getReadBytesThroughput() { return readBytesThroughput; } /** * {@inheritDoc} */ public final double getWrittenBytesThroughput() { return writtenBytesThroughput; } /** * {@inheritDoc} */ public final double getReadMessagesThroughput() { return readMessagesThroughput; } /** * {@inheritDoc} */ public final double getWrittenMessagesThroughput() { return writtenMessagesThroughput; } /** * {@inheritDoc} 更新吞吐量 */ public final void updateThroughput(long currentTime, boolean force) { int interval = (int) (currentTime - lastThroughputCalculationTime); //获取吞吐量计算间隔 long minInterval = getConfig().getThroughputCalculationIntervalInMillis(); if (((minInterval == 0) || (interval < minInterval)) && !force) { return; } //计算读写字节及消息吞吐量 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval; writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval; readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval; writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval; //更新上次读写字节及消息吞吐量 lastReadBytes = readBytes; lastWrittenBytes = writtenBytes; lastReadMessages = readMessages; lastWrittenMessages = writtenMessages; //记录上次更新吞吐量时间 lastThroughputCalculationTime = currentTime; }
设置与获取调度读写字节数及消息数
/** * {@inheritDoc} */ public final long getScheduledWriteBytes() { return scheduledWriteBytes.get(); } /** * {@inheritDoc} */ public final int getScheduledWriteMessages() { return scheduledWriteMessages.get(); } /** * Set the number of scheduled write bytes * * @param byteCount The number of scheduled bytes for write */ protected void setScheduledWriteBytes(int byteCount) { scheduledWriteBytes.set(byteCount); } /** * Set the number of scheduled write messages * * @param messages The number of scheduled messages for write */ protected void setScheduledWriteMessages(int messages) { scheduledWriteMessages.set(messages); } /** * Increase the number of read bytes * 增加读取字节数 * @param increment The number of read bytes * @param currentTime The current time */ public final void increaseReadBytes(long increment, long currentTime) { if (increment <= 0) { return; } readBytes += increment; lastReadTime = currentTime; //更新空心读,读写空闲次数为0 idleCountForBoth.set(0); idleCountForRead.set(0); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime); } } /** * Increase the number of read messages * 增加读取消息数 * @param currentTime The current time */ public final void increaseReadMessages(long currentTime) { readMessages++; lastReadTime = currentTime; idleCountForBoth.set(0); idleCountForRead.set(0); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime); } } /** * Increase the number of written bytes * 增加写字节数 * @param increment The number of written bytes * @param currentTime The current time */ public final void increaseWrittenBytes(int increment, long currentTime) { if (increment <= 0) { return; } writtenBytes += increment; lastWriteTime = currentTime; idleCountForBoth.set(0); idleCountForWrite.set(0); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime); } increaseScheduledWriteBytes(-increment); } /** * Increase the number of written messages * 增加写消息数 * @param request The written message * @param currentTime The current tile */ public final void increaseWrittenMessages(WriteRequest request, long currentTime) { Object message = request.getMessage(); if (message instanceof IoBuffer) { IoBuffer b = (IoBuffer) message; if (b.hasRemaining()) { return; } } writtenMessages++; lastWriteTime = currentTime; if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime); } decreaseScheduledWriteMessages(); } /** * Increase the number of scheduled write bytes for the session * 增加会话调度写字节数 * @param increment The number of newly added bytes to write */ public final void increaseScheduledWriteBytes(int increment) { scheduledWriteBytes.addAndGet(increment); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment); } } /** * Increase the number of scheduled message to write 增加会话调度写消息数 */ public final void increaseScheduledWriteMessages() { scheduledWriteMessages.incrementAndGet(); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages(); } } /** * Decrease the number of scheduled message written */ private void decreaseScheduledWriteMessages() { scheduledWriteMessages.decrementAndGet(); if (getService() instanceof AbstractIoService) { ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages(); } } /** * Decrease the counters of written messages and written bytes when a message has been written * 当消息发送时,减少写字节数与消息数计数器 * @param request The written message */ public final void decreaseScheduledBytesAndMessages(WriteRequest request) { Object message = request.getMessage(); if (message instanceof IoBuffer) { IoBuffer b = (IoBuffer) message; if (b.hasRemaining()) { increaseScheduledWriteBytes(-((IoBuffer) message).remaining()); } else { decreaseScheduledWriteMessages(); } } else { decreaseScheduledWriteMessages(); } } /** * {@inheritDoc} 获取会话写请求队列 */ public final WriteRequestQueue getWriteRequestQueue() { if (writeRequestQueue == null) { throw new IllegalStateException(); } return writeRequestQueue; } /** * {@inheritDoc} 获取会话当前写请求 */ public final WriteRequest getCurrentWriteRequest() { return currentWriteRequest; } /** * {@inheritDoc} */ public final Object getCurrentWriteMessage() { WriteRequest req = getCurrentWriteRequest(); if (req == null) { return null; } return req.getMessage(); } /** * {@inheritDoc} */ public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) { this.currentWriteRequest = currentWriteRequest; } /** * Increase the ReadBuffer size (it will double) 增加读缓存区大小为原来的两倍 */ public final void increaseReadBufferSize() { int newReadBufferSize = getConfig().getReadBufferSize() << 1; if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) { getConfig().setReadBufferSize(newReadBufferSize); } else { getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize()); } deferDecreaseReadBuffer = true; } /** * Decrease the ReadBuffer size (it will be divided by a factor 2) 减少读缓存区大小为原来的1/2 */ public final void decreaseReadBufferSize() { if (deferDecreaseReadBuffer) { deferDecreaseReadBuffer = false; return; } if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) { getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1); } deferDecreaseReadBuffer = true; } /** * {@inheritDoc} 会话创建时间 */ public final long getCreationTime() { return creationTime; } /** * {@inheritDoc} 上次Io时间 */ public final long getLastIoTime() { return Math.max(lastReadTime, lastWriteTime); } /** * {@inheritDoc} 上次读操作时间 */ public final long getLastReadTime() { return lastReadTime; } /** * {@inheritDoc} 上次写操作时间 */ public final long getLastWriteTime() { return lastWriteTime; }
再看读写空闲计数器更新:
是否处于空闲状态status(读空闲,写空闲,读写空闲)
/** * {@inheritDoc} */ public final boolean isIdle(IdleStatus status) { if (status == IdleStatus.BOTH_IDLE) { return idleCountForBoth.get() > 0; } if (status == IdleStatus.READER_IDLE) { return idleCountForRead.get() > 0; } if (status == IdleStatus.WRITER_IDLE) { return idleCountForWrite.get() > 0; } throw new IllegalArgumentException("Unknown idle status: " + status); } /** * {@inheritDoc} */ public final boolean isBothIdle() { return isIdle(IdleStatus.BOTH_IDLE); } /** * {@inheritDoc} */ public final boolean isReaderIdle() { return isIdle(IdleStatus.READER_IDLE); } /** * {@inheritDoc} */ public final boolean isWriterIdle() { return isIdle(IdleStatus.WRITER_IDLE); } /** * {@inheritDoc} 获取会话空闲状态次数 */ public final int getIdleCount(IdleStatus status) { if (getConfig().getIdleTime(status) == 0) { if (status == IdleStatus.BOTH_IDLE) { idleCountForBoth.set(0); } if (status == IdleStatus.READER_IDLE) { idleCountForRead.set(0); } if (status == IdleStatus.WRITER_IDLE) { idleCountForWrite.set(0); } } if (status == IdleStatus.BOTH_IDLE) { return idleCountForBoth.get(); } if (status == IdleStatus.READER_IDLE) { return idleCountForRead.get(); } if (status == IdleStatus.WRITER_IDLE) { return idleCountForWrite.get(); } throw new IllegalArgumentException("Unknown idle status: " + status); } /** * {@inheritDoc} 上次空闲状态时间 */ public final long getLastIdleTime(IdleStatus status) { if (status == IdleStatus.BOTH_IDLE) { return lastIdleTimeForBoth; } if (status == IdleStatus.READER_IDLE) { return lastIdleTimeForRead; } if (status == IdleStatus.WRITER_IDLE) { return lastIdleTimeForWrite; } throw new IllegalArgumentException("Unknown idle status: " + status); } /** * Increase the count of the various Idle counter * 更新空闲状态计数器 * @param status The current status * @param currentTime The current time */ public final void increaseIdleCount(IdleStatus status, long currentTime) { if (status == IdleStatus.BOTH_IDLE) { idleCountForBoth.incrementAndGet(); lastIdleTimeForBoth = currentTime; } else if (status == IdleStatus.READER_IDLE) { idleCountForRead.incrementAndGet(); lastIdleTimeForRead = currentTime; } else if (status == IdleStatus.WRITER_IDLE) { idleCountForWrite.incrementAndGet(); lastIdleTimeForWrite = currentTime; } else { throw new IllegalArgumentException("Unknown idle status: " + status); } } /** * {@inheritDoc} */ public final int getBothIdleCount() { return getIdleCount(IdleStatus.BOTH_IDLE); } /** * {@inheritDoc} */ public final long getLastBothIdleTime() { return getLastIdleTime(IdleStatus.BOTH_IDLE); } /** * {@inheritDoc} */ public final long getLastReaderIdleTime() { return getLastIdleTime(IdleStatus.READER_IDLE); } /** * {@inheritDoc} */ public final long getLastWriterIdleTime() { return getLastIdleTime(IdleStatus.WRITER_IDLE); } /** * {@inheritDoc} */ public final int getReaderIdleCount() { return getIdleCount(IdleStatus.READER_IDLE); } /** * {@inheritDoc} */ public final int getWriterIdleCount() { return getIdleCount(IdleStatus.WRITER_IDLE); } /** * {@inheritDoc} 获取会话远端地址,如果Service为IoAcceptor,为IoAcceptor监听地址, 否则为会话远端socket地址 */ public SocketAddress getServiceAddress() { IoService service = getService(); if (service instanceof IoAcceptor) { return ((IoAcceptor) service).getLocalAddress(); } return getRemoteAddress(); } /** * Get the Id as a String 获取会话id */ private String getIdAsString() { String id = Long.toHexString(getId()).toUpperCase(); if (id.length() <= 8) { return "0x00000000".substring(0, 10 - id.length()) + id; } else { return "0x" + id; } } /** * TGet the Service name 获取service名 */ private String getServiceName() { TransportMetadata tm = getTransportMetadata(); if (tm == null) { return "null"; } return tm.getProviderName() + ' ' + tm.getName(); } /** * {@inheritDoc} 获取会话关联IoService */ public IoService getService() { return service; } /** * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions * in the specified collection. * * @param sessions The sessions that are notified * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) */ public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) { while (sessions.hasNext()) { IoSession session = sessions.next(); if (!session.getCloseFuture().isClosed()) { notifyIdleSession(session, currentTime); } } } /** * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the * specified {@code session}. * 通知会话空闲 * @param session The session that is notified * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) */ public static void notifyIdleSession(IoSession session, long currentTime) { notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); notifyWriteTimeout(session, currentTime); } //触发会话过滤链会话空闲事件fireSessionIdle private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) { if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) { session.getFilterChain().fireSessionIdle(status); } } //通知会话写超时 private static void notifyWriteTimeout(IoSession session, long currentTime) { long writeTimeout = session.getConfig().getWriteTimeoutInMillis(); if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout) && !session.getWriteRequestQueue().isEmpty(session)) { WriteRequest request = session.getCurrentWriteRequest(); if (request != null) { session.setCurrentWriteRequest(null); WriteTimeoutException cause = new WriteTimeoutException(request); request.getFuture().setException(cause); session.getFilterChain().fireExceptionCaught(cause); // WriteException is an IOException, so we close the session. //会话写超时,关闭会话 session.closeNow(); } } } }
总结:
抽象会话AbstractIoSession内部有一个关联的IoService和一个IoHandler;一个写请求队列用于存发会话写请求;一个会话属性Map存放会话属性,还有一些读写字节数,消息数,相关吞吐量和上次读写或空闲操作时间计数器。,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。关闭方法默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。
附:
//IoFutureListener
/** * Something interested in being notified when the result * of an {@link IoFuture} becomes available. * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface IoFutureListener extends EventListener { /** * An {@link IoFutureListener} that closes the {@link IoSession} which is * associated with the specified {@link IoFuture}. */ static IoFutureListener CLOSE = new IoFutureListener() { public void operationComplete(IoFuture future) { future.getSession().close(); } }; /** * Invoked when the operation associated with the {@link IoFuture} * has been completed. * @param future The source {@link IoFuture} which called this * callback. */ void operationComplete(IoFuture future); }
//EventListener
/** * A tagging interface that all event listener interfaces must extend. * @since JDK1.1 */ public interface EventListener { }
//DefaultWriteRequest
public class DefaultWriteRequest implements WriteRequest { private final Object message; private final WriteFuture future; private final SocketAddress destination; ... }
//IoSessionAttributeMap
public interface IoSessionAttributeMap { public abstract Object getAttribute(IoSession iosession, Object obj, Object obj1); public abstract Object setAttribute(IoSession iosession, Object obj, Object obj1); public abstract Object setAttributeIfAbsent(IoSession iosession, Object obj, Object obj1); public abstract Object removeAttribute(IoSession iosession, Object obj); public abstract boolean removeAttribute(IoSession iosession, Object obj, Object obj1); public abstract boolean replaceAttribute(IoSession iosession, Object obj, Object obj1, Object obj2); public abstract boolean containsAttribute(IoSession iosession, Object obj); public abstract Set getAttributeKeys(IoSession iosession); public abstract void dispose(IoSession iosession) throws Exception; }
发表评论
-
Mina 报文连接器(NioDatagramConnector)
2017-06-14 08:46 1435Mina 抽象Polling连接器(A ... -
Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)
2017-06-13 16:01 1564Mina 报文监听器NioDatagramAcceptor一( ... -
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)
2017-06-13 09:51 2599Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina 报文通信简单示例
2017-06-12 09:01 2611MINA TCP简单通信实例:http://donald-dr ... -
Mina socket连接器(NioSocketConnector)
2017-06-12 08:37 4803Mina 抽象Polling连接器(AbstractPolli ... -
Mina 抽象Polling连接器(AbstractPollingIoConnector)
2017-06-11 21:29 1025Mina 连接器接口定义及抽象实现(IoConnector ) ... -
Mina 连接器接口定义及抽象实现(IoConnector )
2017-06-11 13:46 1846Mina IoService接口定义及抽象实现:http:// ... -
Mina socket监听器(NioSocketAcceptor)
2017-06-09 08:44 3446Mina IoService接口定义及抽象实现:http:// ... -
Mina 抽象polling监听器
2017-06-08 22:32 793Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina Io监听器接口定义及抽象实现
2017-06-07 13:02 1363Mina IoService接口定义及抽象实现:http:// ... -
Mina IoService接口定义及抽象实现
2017-06-06 23:44 1210Mina IoHandler接口定义:http://donal ... -
Mina Nio会话(Socket,DataGram)
2017-06-06 12:53 1228Mina Socket会话配置:http://donald-d ... -
Mina Io会话接口定义
2017-06-04 23:15 1186Mina Nio处理器:http://donald-drape ... -
Mina Nio处理器
2017-06-04 22:19 754Mina Io处理器抽象实现:http://donald-dr ... -
Mina Io处理器抽象实现
2017-06-03 23:52 1164Mina 过滤链抽象实现:http://donald-drap ... -
Mina IoHandler接口定义
2017-06-01 21:30 1759Mina 过滤链抽象实现:http://donald-drap ... -
MINA 多路复用协议编解码器工厂二(多路复用协议解码器)
2017-06-01 12:52 2293MINA 多路复用协议编解码器工厂一(多路复用协议编码器): ... -
MINA 多路复用协议编解码器工厂一(多路复用协议编码器)
2017-05-31 22:22 1884MINA 多路分离解码器实例:http://donald-dr ... -
Mina 累计协议解码器
2017-05-31 00:09 1243MINA 编解码器实例:http://donald-drape ... -
Mina 协议编解码过滤器三(会话write与消息接收过滤)
2017-05-28 07:22 1767Mina 协议编解码过滤器一(协议编解码工厂、协议编码器): ...
相关推荐
在Mina框架中,网络通信被抽象为“会话”(Session)和“事件”(Event)。当客户端发起连接或发送数据时,Mina通过事件处理器(EventListener)来触发相应的事件。这些事件处理器是非阻塞的,因此可以在不阻塞其他...
开发者可以通过IoSession接口的方法来管理连接,例如设置和获取用户属性,发送数据,关闭连接,获取远程客户端地址,查询会话的创建时间、最后IO活动时间和配置信息等。 `IoHandler`是Mina中的另一个重要组件,它...
MINA的名字来源于“Multi-purpose Infrastructure for Network Applications”,它提供了一套抽象层,使得开发者可以专注于业务逻辑,而无需关心底层的网络通信细节。MINA的核心理念是为各种协议(如TCP、UDP)提供...
MINA的核心特性在于它的异步IO模型,它采用了NIO(Non-blocking I/O)技术,使得应用程序在处理大量并发连接时能够更加高效。这种模型下,当数据不可读写时,不会阻塞线程,而是返回控制权,允许程序处理其他任务,...
MINA的核心设计理念是提供一个与传输协议无关的抽象层,使得开发者可以专注于业务逻辑,而无需关心底层网络通信的细节。 标题中的"mina框架demo"是指使用MINA框架创建的一个示例项目,通常这个项目会包含一系列的...
Mina 提供了一种抽象层,简化了网络编程,允许开发者专注于业务逻辑,而无需关心底层协议的实现细节。本DEMO将帮助我们深入理解Mina的工作原理及其在实际应用中的使用。 ### 1. Mina 框架的核心概念 - **事件驱动...
Netty4是一个快速开发可维护高性能协议服务器和客户端的框架,而Mina2是一个帮助用户轻松开发高性能、可扩展网络应用程序的框架,提供了一个抽象的事件驱动异步API。 在讨论线程模型前,我们先了解线程模型的基本...
它抽象出了IO服务(IoService)、IO处理器(IoHandler)、IO过滤器链(IoFilterChain)等概念,简化了网络编程的复杂性。 ### MINA框架核心组件 - **IoAcceptor**:用于处理服务端接收新的连接请求的组件。 - **...
- IoBuffer 是 Mina 中用于数据存储和传输的核心类,它是对 Java NIO ByteBuffer 的封装和增强。 - **IoBuffer vs ByteBuffer**: IoBuffer 提供了更多便于操作的功能,比如自动扩容、直接内存分配等特性,使得数据...
在开始学习 Mina 之前,需要有一定的 Java IO 和 Java NIO 知识,以及 Java 线程及并发库(java.util.concurrent)的使用经验。 对于 Mina 的 Server 端和 Client 端,Mina 的 API 将实际的网络通信与应用程序逻辑...
### MINA源码走读与实例 #### 一、MINA概述 **MINA**(**M**ulti **I**nterface **N**etwork *...MINA通过强大的抽象能力和丰富的接口设计,为开发者提供了一种简单高效的方式来构建高性能、高扩展性的网络通信应用。
会话是MINA中客户端连接的抽象,它封装了连接状态和通信上下文。 在“NIO_MINA学习例子_源代码”中,你可以找到以下学习要点: 1. **NIO基础**:研究源代码中的通道和缓冲区的使用,理解如何通过Channel.read()和...
Mina的架构设计使得它能够将网络通信的底层细节从应用程序中抽象出来,这样开发者只需要关注于编写处理数据和业务逻辑的代码。它将网络通信分解为几个核心组件: 1. IoService:这个接口负责网络连接的建立和维护,...
其核心特性包括抽象的、事件驱动的异步API,使得在多种传输协议(如TCP/IP,UDP/IP等)下能够快速高效地进行Java NIO编程。 - **下载与配置Jar包** - **mina-core-2.0.0-M1.jar**:这是Mina的核心库,提供了基本的...
Mina的核心价值在于其能够将底层的网络通信细节抽象出来,使得开发者能够更专注于业务逻辑的实现。它支持多种传输方式,不仅限于TCP/IP和UDP/IP,还包括序列化服务、虚拟机管道通信等,提供了丰富的功能和高度的灵活...
Mina不仅被看作是一个NIO框架,同时也常被称为客户端/服务端框架,或网络套接字类库,它提供了一套高级API来封装底层的IO操作。 #### 二、Mina的核心特性 1. **事件驱动的异步API**:Mina基于事件驱动模型,能够...
1. **事件驱动模型**:MINA采用非阻塞I/O模型,基于Java NIO(New IO)库。这种模型允许MINA在处理大量并发连接时保持高效的性能,因为I/O操作不会阻塞线程,而是通过事件通知机制来触发回调。 2. **过滤器链**:...
MINA 提供了一种抽象层,使得开发者可以专注于编写业务逻辑,而无需关心底层网络通信的复杂性。在"Mian-2.0.3"这个最新版本中,我们看到的是对网络开发的一套强大工具集合。 MINA 的核心特性包括: 1. **非阻塞I/O...
MINA 提供了一个高级抽象层,允许开发者使用 Java NIO(非阻塞I/O)编写网络应用,而无需深入理解底层 I/O 模型的复杂性。这个框架在Java社区中广泛应用于开发服务器端应用,如聊天服务器、游戏服务器、数据传输服务...
在Mina中,网络连接被抽象为Session,数据交换则通过Filter Chain进行处理,这种设计模式允许开发者灵活地添加、修改或移除过滤器以实现不同的功能。 `myhttp`项目正是基于这样的框架,构建了一个HTTP服务器。它...