- 浏览: 980037 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Mina IoHandler接口定义:http://donald-draper.iteye.com/blog/2377419
Mina Nio会话(Socket,DataGram):http://donald-draper.iteye.com/blog/2378169
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
Mina 协议编解码过滤器一(协议编解码工厂、协议编码器):
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
Mina 队列Queue:http://donald-draper.iteye.com/blog/2376712
Mina 协议编解码过滤器三(会话write与消息接收过滤):
http://donald-draper.iteye.com/blog/2376818
Mina 累计协议解码器:http://donald-draper.iteye.com/blog/2377029
MINA 多路复用协议编解码器工厂一(多路复用协议编码器):
http://donald-draper.iteye.com/blog/2377170
MINA 多路复用协议编解码器工厂二(多路复用协议解码器):
http://donald-draper.iteye.com/blog/2377324
Mina Nio处理器:http://donald-draper.iteye.com/blog/2377725
前面的文章我们看了会话配置及Nio会话,IoHandler,Io处理器,过滤链和协议编码过滤器,今天我们来看Mina另一个组件IoService。
再来看IoService的抽象实现:
从上面来看抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。
来看构造;
从上面来看,抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。
再来看其他操作(有一些set和get方法就不说,一看就明白):
再来看广播消息
来看这一句:
//广播消息到Service管理的所有会话
从上面来看广播消息就是遍历Service管理的会话,会话发送消息。
回到抽象Service:
从上来看初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,
将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,
则将连接结果添加会话属性中。
再看看其他方法:
总结:
抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,则将连接结果添加会话属性中。
附:
//IoServiceListener
//IoServiceStatistics
//IoSessionDataStructureFactory
//IoServiceListenerSupport
//默认异常监视器
//ExceptionMonitor
Mina Nio会话(Socket,DataGram):http://donald-draper.iteye.com/blog/2378169
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
Mina 协议编解码过滤器一(协议编解码工厂、协议编码器):
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
Mina 队列Queue:http://donald-draper.iteye.com/blog/2376712
Mina 协议编解码过滤器三(会话write与消息接收过滤):
http://donald-draper.iteye.com/blog/2376818
Mina 累计协议解码器:http://donald-draper.iteye.com/blog/2377029
MINA 多路复用协议编解码器工厂一(多路复用协议编码器):
http://donald-draper.iteye.com/blog/2377170
MINA 多路复用协议编解码器工厂二(多路复用协议解码器):
http://donald-draper.iteye.com/blog/2377324
Mina Nio处理器:http://donald-draper.iteye.com/blog/2377725
前面的文章我们看了会话配置及Nio会话,IoHandler,Io处理器,过滤链和协议编码过滤器,今天我们来看Mina另一个组件IoService。
/** * Base interface for all {@link IoAcceptor}s and {@link IoConnector}s * that provide I/O service and manage {@link IoSession}s. * IoService为IoAcceptor和IoConnector的基础接口,提供IO服务和管理会话 * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface IoService { /** * Adds an {@link IoServiceListener} that listens any events related with * this service. 添加Service监听器 */ void addListener(IoServiceListener listener); /** * Removed an existing {@link IoServiceListener} that listens any events * related with this service. 移除service监听器 */ void removeListener(IoServiceListener listener); /** * Returns all {@link SocketAddress}es this service is managing. * If this service is an {@link IoAcceptor}, a set of bind addresses will * be returned. If this service is an {@link IoConnector}, a set of remote * addresses will be returned. 返回service关联所有socket地址。如果Io服务为IoAcceptor,将返回绑定地址集。如果 服务为IoConnector,将返回连接的远端地址集。 */ Set getManagedServiceAddresses(); /** * Returns <tt>true</tt> if this service is managing the specified <tt>serviceAddress</tt>. * If this service is an {@link IoAcceptor}, <tt>serviceAddress</tt> is a bind address. * If this service is an {@link IoConnector}, <tt>serviceAddress</tt> is a remote address. 判断socket地址是否被Service管理。如果服务为IO为IoAcceptor,则判断socket地址是否为绑定地址。 如果服务为IoConnector,判断socket地址是否为连接的远端地址集。 */ boolean isManaged(SocketAddress serviceAddress); /** * Returns all sessions with the specified remote or local address, * which are currently managed by this service. * {@link IoAcceptor} will assume the specified <tt>address</tt> is a local * address, and {@link IoConnector} will assume it's a remote address. * 返回本地或远端socket地址为serviceAddress的所有service关联的会话。 IoAcceptor为本地地址,IoConnector为远端地址 * @param serviceAddress the address to return all sessions for. * @return the sessions. An empty collection if there's no session. * @throws IllegalArgumentException if the specified <tt>address</tt> has * not been bound. * @throws UnsupportedOperationException if this operation isn't supported * for the particular transport type implemented by this {@link IoService}. */ Set getManagedSessions(SocketAddress serviceAddress); /** * Returns the default configuration which is used when you didn't specify * any configuration. 返回Service默认配置 */ IoServiceConfig getDefaultConfig(); /** * Returns the global {@link IoFilterChainBuilder} which will modify the * {@link IoFilterChain} of all {@link IoSession}s which is managed * by this service. * The default value is an empty {@link DefaultIoFilterChainBuilder}. 返回全局的过滤链构建器,可以修改Service管理的会话关联的过滤链。默认为空过滤链构建器。 */ IoFilterChainBuilder getFilterChainBuilder(); /** * Sets the global {@link IoFilterChainBuilder} which will modify the * {@link IoFilterChain} of all {@link IoSession}s which is managed * by this service. * If you specify <tt>null</tt> this property will be set to * an empty {@link DefaultIoFilterChainBuilder}. 设置过滤链构建器 */ void setFilterChainBuilder(IoFilterChainBuilder builder); /** * A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>. * Please note that the returned object is not a <b>real</b> {@link IoFilterChain} * but a {@link DefaultIoFilterChainBuilder}. Modifying the returned builder * won't affect the existing {@link IoSession}s at all, because * {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s. *返回的非实际的过滤链,而是一个过滤链构建,并不会修改已经创建的会话,过滤链构建器只会 影响新创建会话。 * @throws IllegalStateException if the current {@link IoFilterChainBuilder} is * not a {@link DefaultIoFilterChainBuilder} */ DefaultIoFilterChainBuilder getFilterChain(); }
再来看IoService的抽象实现:
/** * Base implementation of {@link IoService}s. * An instance of IoService contains an Executor which will handle the incoming * events. *AbstractIoService为IoService的基础实现。IoService实例包含一个执行器,将会处理相关事件。 * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public abstract class AbstractIoService implements IoService { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class); /** * The unique number identifying the Service. It's incremented * for each new IoService created. 服务id */ private static final AtomicInteger id = new AtomicInteger(); /** * The thread name built from the IoService inherited * instance class name and the IoService Id 线程名 **/ private final String threadName; /** * The associated executor, responsible for handling execution of I/O events. 关联执行器,负责处理IO事件执行。 */ private final Executor executor; /** * A flag used to indicate that the local executor has been created * inside this instance, and not passed by a caller. * 表示本地执行器是否创建 * If the executor is locally created, then it will be an instance * of the ThreadPoolExecutor class. 如果本地执行器创建,将会是一个ThreadPoolExecutor的实例 */ private final boolean createdExecutor; /** * The IoHandler in charge of managing all the I/O Events. It is IO事件Handler */ private IoHandler handler; /** * The default {@link IoSessionConfig} which will be used to configure new sessions. 默认会话配置,用于创建会话时,配置会话 */ protected final IoSessionConfig sessionConfig; //service创建监听器 private final IoServiceListener serviceActivationListener = new IoServiceListener() { IoServiceStatistics serviceStats;//Service统计 /** * {@inheritDoc} */ @Override public void serviceActivated(IoService service) { // Update lastIoTime. serviceStats = service.getStatistics(); serviceStats.setLastReadTime(service.getActivationTime()); serviceStats.setLastWriteTime(service.getActivationTime()); serviceStats.setLastThroughputCalculationTime(service.getActivationTime()); } /** * {@inheritDoc} */ @Override public void serviceDeactivated(IoService service) throws Exception { // Empty handler } /** * {@inheritDoc} */ @Override public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception { // Empty handler } /** * {@inheritDoc} */ @Override public void sessionCreated(IoSession session) throws Exception { // Empty handler } /** * {@inheritDoc} */ @Override public void sessionClosed(IoSession session) throws Exception { // Empty handler } /** * {@inheritDoc} */ @Override public void sessionDestroyed(IoSession session) throws Exception { // Empty handler } }; /** * Current filter chain builder.过滤链构建器 */ private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder(); //会话数据结构工厂(会话属性Map,会话写请求队列,见附) private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory(); /** * Maintains the {@link IoServiceListener}s of this service. 维护service的service监听器 */ private final IoServiceListenerSupport listeners; /** * A lock object which must be acquired when related resources are * destroyed. service销毁时,释放先关资源,必须先获取这个锁 */ protected final Object disposalLock = new Object(); private volatile boolean disposing;//Service是否正在销毁 private volatile boolean disposed;//Service是否已销毁 private IoServiceStatistics stats = new IoServiceStatistics(this);//Service统计器 }
从上面来看抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。
来看构造;
/** * Constructor for {@link AbstractIoService}. You need to provide a default * session configuration and an {@link Executor} for handling I/O events. If * a null {@link Executor} is provided, a default one will be created using * {@link Executors#newCachedThreadPool()}. * 构造抽象Service。需要提供一个默认的会话配置,一个执行器处理IO事件。如果执行器 为null,默认创建一个Executors#newCachedThreadPool。 * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param executor * the {@link Executor} used for handling execution of I/O * events. Can be <code>null</code>. */ protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { //检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同 //即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig。 if (sessionConfig == null) { throw new IllegalArgumentException("sessionConfig"); } if (getTransportMetadata() == null) { throw new IllegalArgumentException("TransportMetadata"); } if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " + getTransportMetadata().getSessionConfigType() + ")"); } // Create the listeners, and add a first listener : a activation listener // for this service, which will give information on the service state. //将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport listeners = new IoServiceListenerSupport(this); listeners.add(serviceActivationListener); // Stores the given session configuration this.sessionConfig = sessionConfig; // Make JVM load the exception monitor before some transports // change the thread context class loader. //在transports改变线程类加载器上线文前,是JVM虚拟机加载异常监视器 ExceptionMonitor.getInstance(); //初始化IO事件执行器 if (executor == null) { this.executor = Executors.newCachedThreadPool(); createdExecutor = true; } else { this.executor = executor; createdExecutor = false; } threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); }
从上面来看,抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。
再来看其他操作(有一些set和get方法就不说,一看就明白):
/** * {@inheritDoc} */ @Override public final IoFilterChainBuilder getFilterChainBuilder() { return filterChainBuilder; } /** * {@inheritDoc} */ @Override public final void setFilterChainBuilder(IoFilterChainBuilder builder) { if (builder == null) { filterChainBuilder = new DefaultIoFilterChainBuilder(); } else { filterChainBuilder = builder; } } /** * {@inheritDoc} */ @Override public final DefaultIoFilterChainBuilder getFilterChain() { if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) { return (DefaultIoFilterChainBuilder) filterChainBuilder; } throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder."); } /** * {@inheritDoc} */ @Override public final void addListener(IoServiceListener listener) { listeners.add(listener); } /** * {@inheritDoc} */ @Override public final void removeListener(IoServiceListener listener) { listeners.remove(listener); } /** * {@inheritDoc} */ @Override public final boolean isActive() { return listeners.isActive(); } /** * {@inheritDoc} */ @Override public final boolean isDisposing() { return disposing; } /** * {@inheritDoc} */ @Override public final boolean isDisposed() { return disposed; } /** * {@inheritDoc} */ @Override public final void dispose() { dispose(false); } /** * {@inheritDoc} */ @Override public final void dispose(boolean awaitTermination) { if (disposed) { return; } synchronized (disposalLock) { if (!disposing) { disposing = true; try { dispose0(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } if (createdExecutor) { ExecutorService e = (ExecutorService) executor; //关闭线程池执行器 e.shutdownNow(); if (awaitTermination) { try { LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName()); e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); LOGGER.debug("awaitTermination on {} finished", this); } catch (InterruptedException e1) { LOGGER.warn("awaitTermination on [{}] was interrupted", this); // Restore the interrupted status Thread.currentThread().interrupt(); } } } disposed = true; } /** * Implement this method to release any acquired resources. This method * is invoked only once by {@link #dispose()}. * 待子类扩展 * @throws Exception If the dispose failed */ protected abstract void dispose0() throws Exception; /** * {@inheritDoc} */ @Override public final Map<Long, IoSession> getManagedSessions() { return listeners.getManagedSessions(); } /** * {@inheritDoc} */ @Override public final int getManagedSessionCount() { return listeners.getManagedSessionCount(); } /** * {@inheritDoc} */ @Override public final IoHandler getHandler() { return handler; } /** * {@inheritDoc} */ @Override public final void setHandler(IoHandler handler) { if (handler == null) { throw new IllegalArgumentException("handler cannot be null"); } if (isActive()) { throw new IllegalStateException("handler cannot be set while the service is active."); } this.handler = handler; } /** * {@inheritDoc} */ @Override public final IoSessionDataStructureFactory getSessionDataStructureFactory() { return sessionDataStructureFactory; } /** * {@inheritDoc} */ @Override public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) { if (sessionDataStructureFactory == null) { throw new IllegalArgumentException("sessionDataStructureFactory"); } if (isActive()) { throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active."); } this.sessionDataStructureFactory = sessionDataStructureFactory; } /** * {@inheritDoc} */ @Override public IoServiceStatistics getStatistics() { return stats; } /** * {@inheritDoc} */ @Override public final long getActivationTime() { return listeners.getActivationTime(); }
再来看广播消息
/** * {@inheritDoc} 广播消息 */ @Override public final Set<WriteFuture> broadcast(Object message) { // Convert to Set. We do not return a List here because only the // direct caller of MessageBroadcaster knows the order of write // operations. final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values()); return new AbstractSet<WriteFuture>() { @Override public Iterator<WriteFuture> iterator() { return futures.iterator(); } @Override public int size() { return futures.size(); } }; }
来看这一句:
//广播消息到Service管理的所有会话
List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
public final class IoUtil { private static final IoSession EMPTY_SESSIONS[] = new IoSession[0]; public static List broadcast(Object message, Collection sessions) { List answer = new ArrayList(sessions.size()); broadcast(message, sessions.iterator(), ((Collection) (answer))); return answer; } private static void broadcast(Object message, Iterator sessions, Collection answer) { if(message instanceof IoBuffer) { IoSession s; //遍历Service管理的会话,会话发送消息 for(; sessions.hasNext(); answer.add(s.write(((IoBuffer)message).duplicate()))) s = (IoSession)sessions.next(); } else { IoSession s; for(; sessions.hasNext(); answer.add(s.write(message))) s = (IoSession)sessions.next(); } } }
从上面来看广播消息就是遍历Service管理的会话,会话发送消息。
回到抽象Service:
/** * @return The {@link IoServiceListenerSupport} attached to this service */ public final IoServiceListenerSupport getListeners() { return listeners; } protected final void executeWorker(Runnable worker) { executeWorker(worker, null); } //执行线程 protected final void executeWorker(Runnable worker, String suffix) { String actualThreadName = threadName; if (suffix != null) { actualThreadName = actualThreadName + '-' + suffix; } executor.execute(new NamePreservingRunnable(worker, actualThreadName)); }
protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) { // Update lastIoTime if needed. //更新service上次读写时间 if (stats.getLastReadTime() == 0) { stats.setLastReadTime(getActivationTime()); } if (stats.getLastWriteTime() == 0) { stats.setLastWriteTime(getActivationTime()); } // Every property but attributeMap should be set now. // Now initialize the attributeMap. The reason why we initialize // the attributeMap at last is to make sure all session properties // such as remoteAddress are provided to IoSessionDataStructureFactory. //将service会话数据结构工厂的会话属性添加到具体的会话中 try { ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory() .getAttributeMap(session)); } catch (IoSessionInitializationException e) { throw e; } catch (Exception e) { throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e); } //将service会话数据结构工厂的写请求队列,设置到具体的会话中 try { ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory() .getWriteRequestQueue(session)); } catch (IoSessionInitializationException e) { throw e; } catch (Exception e) { throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e); } if ((future != null) && (future instanceof ConnectFuture)) { // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now). //如果为连接会话,则将连接结果添加会话属性中 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future); } if (sessionInitializer != null) { sessionInitializer.initializeSession(session, future); } finishSessionInitialization0(session, future); } /** * Implement this method to perform additional tasks required for session * initialization. Do not call this method directly; * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call * this method instead. * 待子类拓展,不建议直接调用此方,最好调用#initSession * @param session The session to initialize * @param future The Future to use * */ protected void finishSessionInitialization0(IoSession session, IoFuture future) { // Do nothing. Extended class might add some specific code }
//IoSessionInitializer public interface IoSessionInitializer { public abstract void initializeSession(IoSession iosession, IoFuture iofuture); }
从上来看初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,
将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,
则将连接结果添加会话属性中。
再看看其他方法:
/** * {@inheritDoc} */ @Override public int getScheduledWriteBytes() { return stats.getScheduledWriteBytes(); } /** * {@inheritDoc} */ @Override public int getScheduledWriteMessages() { return stats.getScheduledWriteMessages(); }
/** * A {@link IoFuture} dedicated class for Service结果 * */ protected static class ServiceOperationFuture extends DefaultIoFuture { public ServiceOperationFuture() { super(null); } /** * {@inheritDoc} */ @Override public final boolean isDone() { return getValue() == Boolean.TRUE; } public final void setDone() { setValue(Boolean.TRUE); } public final Exception getException() { if (getValue() instanceof Exception) { return (Exception) getValue(); } return null; } public final void setException(Exception exception) { if (exception == null) { throw new IllegalArgumentException("exception"); } setValue(exception); } }
总结:
抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,则将连接结果添加会话属性中。
附:
//IoServiceListener
/** * Something interested in being notified when the result * of an {@link IoFuture} becomes available. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface IoServiceListener extends EventListener { /** * Invoked when a new service is activated by an {@link IoService}. * Service创建时调用 * @param service the {@link IoService} * @param serviceAddress the socket address of the {@link IoService} listens * to manage sessions. If the service is an {@link IoAcceptor}, * it is a bind address. If the service is an {@link IoConnector}, * it is a remote address. * @param handler the {@link IoHandler} that serves the new service * @param config the {@link IoServiceConfig} of the new service */ void serviceActivated(IoService service, SocketAddress serviceAddress, IoHandler handler, IoServiceConfig config); /** * Invoked when a service is deactivated by an {@link IoService}. * Service失效是调用 * @param service the {@link IoService} * @param serviceAddress the socket address of the {@link IoService} listens * to manage sessions. If the service is an {@link IoAcceptor}, * it is a bind address. If the service is an {@link IoConnector}, * it is a remote address. * @param handler the {@link IoHandler} that serves the service * @param config the {@link IoServiceConfig} of the service */ void serviceDeactivated(IoService service, SocketAddress serviceAddress, IoHandler handler, IoServiceConfig config); /** * Invoked when a new session is created by an {@link IoService}. * Service创建会话时调用 * @param session the new session */ void sessionCreated(IoSession session); /** * Invoked when a session is being destroyed by an {@link IoService}. * 当会话被Service销毁时调用 * @param session the session to be destroyed */ void sessionDestroyed(IoSession session); }
//IoServiceStatistics
public class IoServiceStatistics { private AbstractIoService service; private double readBytesThroughput; private double writtenBytesThroughput; private double readMessagesThroughput; private double writtenMessagesThroughput; private double largestReadBytesThroughput; private double largestWrittenBytesThroughput; private double largestReadMessagesThroughput; private double largestWrittenMessagesThroughput; private long readBytes; private long writtenBytes; private long readMessages; private long writtenMessages; private long lastReadTime; private long lastWriteTime; private long lastReadBytes; private long lastWrittenBytes; private long lastReadMessages; private long lastWrittenMessages; private long lastThroughputCalculationTime; private int scheduledWriteBytes; private int scheduledWriteMessages; private final AtomicInteger throughputCalculationInterval = new AtomicInteger(3); private final Lock throughputCalculationLock = new ReentrantLock(); ... }
//IoSessionDataStructureFactory
public interface IoSessionDataStructureFactory { public abstract IoSessionAttributeMap getAttributeMap(IoSession iosession) throws Exception; public abstract WriteRequestQueue getWriteRequestQueue(IoSession iosession) throws Exception; }
//IoServiceListenerSupport
public class IoServiceListenerSupport { private final IoService service; private final List listeners = new CopyOnWriteArrayList();//Service监听器 private final ConcurrentMap managedSessions = new ConcurrentHashMap();//service管理会话Map private final Map readOnlyManagedSessions; private final AtomicBoolean activated = new AtomicBoolean(); private volatile long activationTime; private volatile int largestManagedSessionCount; private AtomicLong cumulativeManagedSessionCount; }
//默认异常监视器
public class DefaultExceptionMonitor extends ExceptionMonitor { private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/util/DefaultExceptionMonitor); public DefaultExceptionMonitor() { } public void exceptionCaught(Throwable cause) { if(cause instanceof Error) { throw (Error)cause; } else { LOGGER.warn("Unexpected exception.", cause); return; } } }
//ExceptionMonitor
public abstract class ExceptionMonitor { private static ExceptionMonitor instance = new DefaultExceptionMonitor(); public ExceptionMonitor() { } public static ExceptionMonitor getInstance() { return instance; } public static void setInstance(ExceptionMonitor monitor) { if(monitor == null) monitor = new DefaultExceptionMonitor(); instance = monitor; } public abstract void exceptionCaught(Throwable throwable); }
发表评论
-
Mina 报文连接器(NioDatagramConnector)
2017-06-14 08:46 1417Mina 抽象Polling连接器(A ... -
Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)
2017-06-13 16:01 1542Mina 报文监听器NioDatagramAcceptor一( ... -
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)
2017-06-13 09:51 2575Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina 报文通信简单示例
2017-06-12 09:01 2585MINA TCP简单通信实例:http://donald-dr ... -
Mina socket连接器(NioSocketConnector)
2017-06-12 08:37 4772Mina 抽象Polling连接器(AbstractPolli ... -
Mina 抽象Polling连接器(AbstractPollingIoConnector)
2017-06-11 21:29 1009Mina 连接器接口定义及抽象实现(IoConnector ) ... -
Mina 连接器接口定义及抽象实现(IoConnector )
2017-06-11 13:46 1831Mina IoService接口定义及抽象实现:http:// ... -
Mina socket监听器(NioSocketAcceptor)
2017-06-09 08:44 3419Mina IoService接口定义及抽象实现:http:// ... -
Mina 抽象polling监听器
2017-06-08 22:32 784Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina Io监听器接口定义及抽象实现
2017-06-07 13:02 1349Mina IoService接口定义及抽象实现:http:// ... -
Mina Nio会话(Socket,DataGram)
2017-06-06 12:53 1209Mina Socket会话配置:http://donald-d ... -
Mina 抽象Io会话
2017-06-05 22:45 1015Mina Io会话接口定义:http://donald-dra ... -
Mina Io会话接口定义
2017-06-04 23:15 1167Mina Nio处理器:http://donald-drape ... -
Mina Nio处理器
2017-06-04 22:19 742Mina Io处理器抽象实现:http://donald-dr ... -
Mina Io处理器抽象实现
2017-06-03 23:52 1148Mina 过滤链抽象实现:http://donald-drap ... -
Mina IoHandler接口定义
2017-06-01 21:30 1734Mina 过滤链抽象实现:http://donald-drap ... -
MINA 多路复用协议编解码器工厂二(多路复用协议解码器)
2017-06-01 12:52 2274MINA 多路复用协议编解码器工厂一(多路复用协议编码器): ... -
MINA 多路复用协议编解码器工厂一(多路复用协议编码器)
2017-05-31 22:22 1868MINA 多路分离解码器实例:http://donald-dr ... -
Mina 累计协议解码器
2017-05-31 00:09 1229MINA 编解码器实例:http://donald-drape ... -
Mina 协议编解码过滤器三(会话write与消息接收过滤)
2017-05-28 07:22 1750Mina 协议编解码过滤器一(协议编解码工厂、协议编码器): ...
相关推荐
2. BaseIoService:这是一个抽象类,它的目的是实现IoService接口中的一些默认功能,降低子类实现接口时的负担。BaseIoService实现了大部分的IoService接口方法,但不包括一些配置方法,如getDefaultConfig()。 3. ...
IoService是Mina的核心接口,它定义了服务端处理客户端连接、读写数据以及管理会话的主要方法。IoService的实现类如NioServerSocketAcceptor,负责监听和接受来自客户端的连接请求。 #### 2.1. 类结构 IoService...
- **定义**:`BaseIoService` 是一个抽象类,实现了 `IoService` 接口中的一部分方法。它提供了一个基本的服务实现框架,可以被其他具体的类继承并进一步扩展。 - **功能**:`BaseIoService` 实现了 `IoService` 的...
MINA的核心是`IoService`接口,它定义了服务端和客户端的基本功能。`IoAcceptor`是服务端接口,负责监听和接受新的连接,而`IoConnector`则代表客户端,用于建立到远程服务器的连接。它们都依赖于`Session`接口,...
1. **IoService**:这是Mina的核心接口,它是所有I/O通信的起点,定义了服务的基本操作,如启动、停止、绑定端口、连接远程服务器等。`IoService`接口提供了管理会话(IoSession)和配置(IoServiceConfig)的能力。...
MINA通过IoSession接口提供了对客户端连接的抽象,可以存储客户端的状态信息,方便地识别和管理不同的客户端请求,以及实现客户端之间的通信。 以下是一个简单的MINA服务端代码示例: ```java // 初始化Acceptor,...
3. IoFilter:定义了一组拦截器接口,可以实现诸如日志输出、黑名单过滤以及数据的编码(write方向)和解码(read方向)等功能。其中,数据的编码和解码是使用Mina进行网络通信时重点关注的部分。 4. IoHandler:该...
2. **IoHandler:** 是MINA的核心接口,定义了处理网络事件的方法,如连接建立、数据读写、连接关闭等。 3. **IoFilter:** 过滤器允许你在数据发送到目的地或从源头接收之前对数据进行处理。它们可以用来实现认证...
3. IoFilter:该接口定义了一组拦截器,可以实现包括日志输出、数据编码解码等不同功能。在使用Mina时,数据的编解码通常是最主要关注的部分。 4. IoHandler:该接口负责具体的业务逻辑,也就是接收和发送数据的...
1. 实现 `IoService`:首先创建 `IoAcceptor`,它是服务端和客户端的抽象。由于我们需要构建 TCP 服务器,因此选择 `NioSocketAcceptor`,它底层基于 `ServerSocketChannel`。如果使用了 Apache APR 库,可以选择 `...
3. **IoHandler**: IoHandler接口定义了服务端接收到数据后需要执行的方法,如消息的接收和发送。它是用户实现业务逻辑的地方。 4. **IoService**: 这是MINA的服务组件,它管理IoSession并负责创建、管理和关闭它们...
4. **协议支持**:MINA 支持自定义协议,开发者可以通过实现 `ProtocolDecoder` 和 `ProtocolEncoder` 接口来处理数据编码和解码。 5. **事件驱动**:MINA 通过 `IoEventType` 定义了一系列网络事件,如 OPEN、...
2. **IoHandler**: IoHandler是Mina的核心接口,定义了处理网络事件的方法,如连接建立、数据读取、连接关闭等。 3. **ProtocolDecoder/ProtocolEncoder**: 这两个接口用于将接收到的原始字节流解码为应用程序可...
IoService是服务端和客户端的抽象接口。 2. **IoProcessor**:在另一个线程上,IoProcessor负责检查通道上的数据读写,它也有自己的Selector,并负责调用IoFilter链和IoHandler。IoProcessor的职责是执行实际的I/O...
Apache Mina提供了一种抽象层,让开发者可以专注于处理应用程序逻辑,而无需关心底层网络通信的复杂性。它支持多种协议,如TCP/IP、UDP/IP和HTTP,并且适用于Java NIO(非阻塞I/O)模型,这使得Mina在高并发环境下...