`
Donald_Draper
  • 浏览: 984293 次
社区版块
存档分类
最新评论

Mina IoService接口定义及抽象实现

    博客分类:
  • Mina
阅读更多
Mina IoHandler接口定义:http://donald-draper.iteye.com/blog/2377419
Mina Nio会话(Socket,DataGram):http://donald-draper.iteye.com/blog/2378169
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
Mina 协议编解码过滤器一(协议编解码工厂、协议编码器):
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
Mina 队列Queue:http://donald-draper.iteye.com/blog/2376712
Mina 协议编解码过滤器三(会话write与消息接收过滤):
http://donald-draper.iteye.com/blog/2376818
Mina 累计协议解码器:http://donald-draper.iteye.com/blog/2377029
MINA 多路复用协议编解码器工厂一(多路复用协议编码器):
http://donald-draper.iteye.com/blog/2377170
MINA 多路复用协议编解码器工厂二(多路复用协议解码器):
http://donald-draper.iteye.com/blog/2377324
Mina Nio处理器:http://donald-draper.iteye.com/blog/2377725
前面的文章我们看了会话配置及Nio会话,IoHandler,Io处理器,过滤链和协议编码过滤器,今天我们来看Mina另一个组件IoService。

/**
 * Base interface for all {@link IoAcceptor}s and {@link IoConnector}s
 * that provide I/O service and manage {@link IoSession}s.
 * IoService为IoAcceptor和IoConnector的基础接口,提供IO服务和管理会话
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public interface IoService {
    /**
     * Adds an {@link IoServiceListener} that listens any events related with
     * this service.
     添加Service监听器
     */
    void addListener(IoServiceListener listener);

    /**
     * Removed an existing {@link IoServiceListener} that listens any events
     * related with this service.
     移除service监听器
     */
    void removeListener(IoServiceListener listener);

    /**
     * Returns all {@link SocketAddress}es this service is managing.
     * If this service is an {@link IoAcceptor}, a set of bind addresses will
     * be returned.  If this service is an {@link IoConnector}, a set of remote
     * addresses will be returned.
     返回service关联所有socket地址。如果Io服务为IoAcceptor,将返回绑定地址集。如果
     服务为IoConnector,将返回连接的远端地址集。
     */
    Set getManagedServiceAddresses();

    /**
     * Returns <tt>true</tt> if this service is managing the specified <tt>serviceAddress</tt>.
     * If this service is an {@link IoAcceptor}, <tt>serviceAddress</tt> is a bind address.
     * If this service is an {@link IoConnector}, <tt>serviceAddress</tt> is a remote address.
     判断socket地址是否被Service管理。如果服务为IO为IoAcceptor,则判断socket地址是否为绑定地址。
     如果服务为IoConnector,判断socket地址是否为连接的远端地址集。
     */
    boolean isManaged(SocketAddress serviceAddress);

    /**
     * Returns all sessions with the specified remote or local address,
     * which are currently managed by this service.
     * {@link IoAcceptor} will assume the specified <tt>address</tt> is a local
     * address, and {@link IoConnector} will assume it's a remote address.
     * 返回本地或远端socket地址为serviceAddress的所有service关联的会话。
     IoAcceptor为本地地址,IoConnector为远端地址
     * @param serviceAddress the address to return all sessions for.
     * @return the sessions. An empty collection if there's no session.
     * @throws IllegalArgumentException if the specified <tt>address</tt> has 
     *         not been bound.
     * @throws UnsupportedOperationException if this operation isn't supported
     *         for the particular transport type implemented by this {@link IoService}.
     */
    Set getManagedSessions(SocketAddress serviceAddress);

    /**
     * Returns the default configuration which is used when you didn't specify
     * any configuration.
     返回Service默认配置
     */
    IoServiceConfig getDefaultConfig();

    /**
     * Returns the global {@link IoFilterChainBuilder} which will modify the
     * {@link IoFilterChain} of all {@link IoSession}s which is managed
     * by this service.
     * The default value is an empty {@link DefaultIoFilterChainBuilder}.
     返回全局的过滤链构建器,可以修改Service管理的会话关联的过滤链。默认为空过滤链构建器。
     */
    IoFilterChainBuilder getFilterChainBuilder();

    /**
     * Sets the global {@link IoFilterChainBuilder} which will modify the
     * {@link IoFilterChain} of all {@link IoSession}s which is managed
     * by this service.
     * If you specify <tt>null</tt> this property will be set to
     * an empty {@link DefaultIoFilterChainBuilder}.
     设置过滤链构建器
     */
    void setFilterChainBuilder(IoFilterChainBuilder builder);

    /**
     * A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
     * Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
     * but a {@link DefaultIoFilterChainBuilder}.  Modifying the returned builder
     * won't affect the existing {@link IoSession}s at all, because
     * {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s.
     *返回的非实际的过滤链,而是一个过滤链构建,并不会修改已经创建的会话,过滤链构建器只会
     影响新创建会话。
     * @throws IllegalStateException if the current {@link IoFilterChainBuilder} is
     *                               not a {@link DefaultIoFilterChainBuilder}
     */
    DefaultIoFilterChainBuilder getFilterChain();
}

再来看IoService的抽象实现:
/**
 * Base implementation of {@link IoService}s.
 * An instance of IoService contains an Executor which will handle the incoming
 * events.
 *AbstractIoService为IoService的基础实现。IoService实例包含一个执行器,将会处理相关事件。
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public abstract class AbstractIoService implements IoService {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);

    /**
     * The unique number identifying the Service. It's incremented
     * for each new IoService created.
     服务id
     */
    private static final AtomicInteger id = new AtomicInteger();

    /**
     * The thread name built from the IoService inherited
     * instance class name and the IoService Id
     线程名
     **/
    private final String threadName;

    /**
     * The associated executor, responsible for handling execution of I/O events.
     关联执行器,负责处理IO事件执行。
     */
    private final Executor executor;

    /**
     * A flag used to indicate that the local executor has been created
     * inside this instance, and not passed by a caller.
     * 表示本地执行器是否创建
     * If the executor is locally created, then it will be an instance
     * of the ThreadPoolExecutor class.
     如果本地执行器创建,将会是一个ThreadPoolExecutor的实例
     */
    private final boolean createdExecutor;

    /**
     * The IoHandler in charge of managing all the I/O Events. It is
     IO事件Handler
     */
    private IoHandler handler;

    /**
     * The default {@link IoSessionConfig} which will be used to configure new sessions.
     默认会话配置,用于创建会话时,配置会话
     */
    protected final IoSessionConfig sessionConfig;
    //service创建监听器
    private final IoServiceListener serviceActivationListener = new IoServiceListener() {
        IoServiceStatistics serviceStats;//Service统计
        /**
         * {@inheritDoc}
         */
        @Override
        public void serviceActivated(IoService service) {
            // Update lastIoTime.
            serviceStats = service.getStatistics();
            serviceStats.setLastReadTime(service.getActivationTime());
            serviceStats.setLastWriteTime(service.getActivationTime());
            serviceStats.setLastThroughputCalculationTime(service.getActivationTime());
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void serviceDeactivated(IoService service) throws Exception {
            // Empty handler
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
            // Empty handler
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            // Empty handler
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            // Empty handler
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void sessionDestroyed(IoSession session) throws Exception {
            // Empty handler
        }
    };
    /**
     * Current filter chain builder.过滤链构建器
     */
    private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
    //会话数据结构工厂(会话属性Map,会话写请求队列,见附)
    private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
    /**
     * Maintains the {@link IoServiceListener}s of this service.
     维护service的service监听器
     */
    private final IoServiceListenerSupport listeners;
    /**
     * A lock object which must be acquired when related resources are
     * destroyed.
     service销毁时,释放先关资源,必须先获取这个锁
     */
    protected final Object disposalLock = new Object();
    private volatile boolean disposing;//Service是否正在销毁
    private volatile boolean disposed;//Service是否已销毁
    private IoServiceStatistics stats = new IoServiceStatistics(this);//Service统计器
}

从上面来看抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。
来看构造;
/**
     * Constructor for {@link AbstractIoService}. You need to provide a default
     * session configuration and an {@link Executor} for handling I/O events. If
     * a null {@link Executor} is provided, a default one will be created using
     * {@link Executors#newCachedThreadPool()}.
     * 构造抽象Service。需要提供一个默认的会话配置,一个执行器处理IO事件。如果执行器
     为null,默认创建一个Executors#newCachedThreadPool。
     * @param sessionConfig
     *            the default configuration for the managed {@link IoSession}
     * @param executor
     *            the {@link Executor} used for handling execution of I/O
     *            events. Can be <code>null</code>.
     */
    protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
       //检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同
       //即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig。
        if (sessionConfig == null) {
            throw new IllegalArgumentException("sessionConfig");
        }
        if (getTransportMetadata() == null) {
            throw new IllegalArgumentException("TransportMetadata");
        }
        if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
            throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
                    + getTransportMetadata().getSessionConfigType() + ")");
        }
        // Create the listeners, and add a first listener : a activation listener
        // for this service, which will give information on the service state.
	//将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport
        listeners = new IoServiceListenerSupport(this);
        listeners.add(serviceActivationListener);
        // Stores the given session configuration
        this.sessionConfig = sessionConfig;
        // Make JVM load the exception monitor before some transports
        // change the thread context class loader.
	//在transports改变线程类加载器上线文前,是JVM虚拟机加载异常监视器
        ExceptionMonitor.getInstance();
	//初始化IO事件执行器
        if (executor == null) {
            this.executor = Executors.newCachedThreadPool();
            createdExecutor = true;
        } else {
            this.executor = executor;
            createdExecutor = false;
        }
        threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
    }

从上面来看,抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。
再来看其他操作(有一些set和get方法就不说,一看就明白):
 
  /**
     * {@inheritDoc}
     */
    @Override
    public final IoFilterChainBuilder getFilterChainBuilder() {
        return filterChainBuilder;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
        if (builder == null) {
            filterChainBuilder = new DefaultIoFilterChainBuilder();
        } else {
            filterChainBuilder = builder;
        }
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final DefaultIoFilterChainBuilder getFilterChain() {
        if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
            return (DefaultIoFilterChainBuilder) filterChainBuilder;
        }
        throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void addListener(IoServiceListener listener) {
        listeners.add(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void removeListener(IoServiceListener listener) {
        listeners.remove(listener);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isActive() {
        return listeners.isActive();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDisposing() {
        return disposing;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDisposed() {
        return disposed;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void dispose() {
        dispose(false);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void dispose(boolean awaitTermination) {
        if (disposed) {
            return;
        }
        synchronized (disposalLock) {
            if (!disposing) {
                disposing = true;

                try {
                    dispose0();
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                }
            }
        }
        if (createdExecutor) {
            ExecutorService e = (ExecutorService) executor;
	    //关闭线程池执行器
            e.shutdownNow();
            if (awaitTermination) {

                try {
                    LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                    LOGGER.debug("awaitTermination on {} finished", this);
                } catch (InterruptedException e1) {
                    LOGGER.warn("awaitTermination on [{}] was interrupted", this);
                    // Restore the interrupted status
                    Thread.currentThread().interrupt();
                }
            }
        }
        disposed = true;
    }
    /**
     * Implement this method to release any acquired resources.  This method
     * is invoked only once by {@link #dispose()}.
     * 待子类扩展
     * @throws Exception If the dispose failed
     */
    protected abstract void dispose0() throws Exception;
    /**
     * {@inheritDoc}
     */
    @Override
    public final Map<Long, IoSession> getManagedSessions() {
        return listeners.getManagedSessions();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final int getManagedSessionCount() {
        return listeners.getManagedSessionCount();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final IoHandler getHandler() {
        return handler;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setHandler(IoHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler cannot be null");
        }

        if (isActive()) {
            throw new IllegalStateException("handler cannot be set while the service is active.");
        }

        this.handler = handler;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
        return sessionDataStructureFactory;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
        if (sessionDataStructureFactory == null) {
            throw new IllegalArgumentException("sessionDataStructureFactory");
        }

        if (isActive()) {
            throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
        }

        this.sessionDataStructureFactory = sessionDataStructureFactory;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public IoServiceStatistics getStatistics() {
        return stats;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final long getActivationTime() {
        return listeners.getActivationTime();
    }

再来看广播消息

   
/**
     * {@inheritDoc}
     广播消息
     */
    @Override
    public final Set<WriteFuture> broadcast(Object message) {
        // Convert to Set.  We do not return a List here because only the
        // direct caller of MessageBroadcaster knows the order of write
        // operations.
        final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
        return new AbstractSet<WriteFuture>() {
            @Override
            public Iterator<WriteFuture> iterator() {
                return futures.iterator();
            }

            @Override
            public int size() {
                return futures.size();
            }
        };
    }

来看这一句:
//广播消息到Service管理的所有会话
List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());

public final class IoUtil
{
    private static final IoSession EMPTY_SESSIONS[] = new IoSession[0];
    public static List broadcast(Object message, Collection sessions)
    {
        List answer = new ArrayList(sessions.size());
        broadcast(message, sessions.iterator(), ((Collection) (answer)));
        return answer;
    }
    private static void broadcast(Object message, Iterator sessions, Collection answer)
    {
        if(message instanceof IoBuffer)
        {
            IoSession s;
	    //遍历Service管理的会话,会话发送消息
            for(; sessions.hasNext(); answer.add(s.write(((IoBuffer)message).duplicate())))
                s = (IoSession)sessions.next();

        } else
        {
            IoSession s;
            for(; sessions.hasNext(); answer.add(s.write(message)))
                s = (IoSession)sessions.next();

        }
    }
}

从上面来看广播消息就是遍历Service管理的会话,会话发送消息。
回到抽象Service:
 
  /**
     * @return The {@link IoServiceListenerSupport} attached to this service
     */
    public final IoServiceListenerSupport getListeners() {
        return listeners;
    }
    protected final void executeWorker(Runnable worker) {
        executeWorker(worker, null);
    }
    //执行线程
    protected final void executeWorker(Runnable worker, String suffix) {
        String actualThreadName = threadName;
        if (suffix != null) {
            actualThreadName = actualThreadName + '-' + suffix;
        }
        executor.execute(new NamePreservingRunnable(worker, actualThreadName));
    }

  
 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
        // Update lastIoTime if needed.
	//更新service上次读写时间
        if (stats.getLastReadTime() == 0) {
            stats.setLastReadTime(getActivationTime());
        }
        if (stats.getLastWriteTime() == 0) {
            stats.setLastWriteTime(getActivationTime());
        }

        // Every property but attributeMap should be set now.
        // Now initialize the attributeMap.  The reason why we initialize
        // the attributeMap at last is to make sure all session properties
        // such as remoteAddress are provided to IoSessionDataStructureFactory.
	//将service会话数据结构工厂的会话属性添加到具体的会话中
        try {
            ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
                    .getAttributeMap(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
        }
        //将service会话数据结构工厂的写请求队列,设置到具体的会话中
        try {
            ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
        }

        if ((future != null) && (future instanceof ConnectFuture)) {
            // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
	    //如果为连接会话,则将连接结果添加会话属性中
            session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
        }

        if (sessionInitializer != null) {
            sessionInitializer.initializeSession(session, future);
        }
        finishSessionInitialization0(session, future);
    }

    /**
     * Implement this method to perform additional tasks required for session
     * initialization. Do not call this method directly;
     * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
     * this method instead.
     * 待子类拓展,不建议直接调用此方,最好调用#initSession
     * @param session The session to initialize
     * @param future The Future to use
     * 
     */
    protected void finishSessionInitialization0(IoSession session, IoFuture future) {
        // Do nothing. Extended class might add some specific code
    }

//IoSessionInitializer
public interface IoSessionInitializer
{
    public abstract void initializeSession(IoSession iosession, IoFuture iofuture);
}

从上来看初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,
将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,
则将连接结果添加会话属性中。
再看看其他方法:
/**
     * {@inheritDoc}
     */
    @Override
    public int getScheduledWriteBytes() {
        return stats.getScheduledWriteBytes();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int getScheduledWriteMessages() {
        return stats.getScheduledWriteMessages();
    }

   
  /**
     * A  {@link IoFuture} dedicated class for 
     Service结果
     *
     */
    protected static class ServiceOperationFuture extends DefaultIoFuture {
        public ServiceOperationFuture() {
            super(null);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public final boolean isDone() {
            return getValue() == Boolean.TRUE;
        }
        public final void setDone() {
            setValue(Boolean.TRUE);
        }
        public final Exception getException() {
            if (getValue() instanceof Exception) {
                return (Exception) getValue();
            }

            return null;
        }
        public final void setException(Exception exception) {
            if (exception == null) {
                throw new IllegalArgumentException("exception");
            }
            setValue(exception);
        }
    }

总结:
抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,则将连接结果添加会话属性中。

附:
//IoServiceListener
/**
 * Something interested in being notified when the result
 * of an {@link IoFuture} becomes available.
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public interface IoServiceListener extends EventListener {
    /**
     * Invoked when a new service is activated by an {@link IoService}.
     * Service创建时调用
     * @param service the {@link IoService}
     * @param serviceAddress the socket address of the {@link IoService} listens
     *                       to manage sessions.  If the service is an {@link IoAcceptor},
     *                       it is a bind address.  If the service is an {@link IoConnector},
     *                       it is a remote address.
     * @param handler the {@link IoHandler} that serves the new service
     * @param config  the {@link IoServiceConfig} of the new service
     */
    void serviceActivated(IoService service, SocketAddress serviceAddress,
            IoHandler handler, IoServiceConfig config);

    /**
     * Invoked when a service is deactivated by an {@link IoService}.
     * Service失效是调用
     * @param service the {@link IoService}
     * @param serviceAddress the socket address of the {@link IoService} listens
     *                       to manage sessions.  If the service is an {@link IoAcceptor},
     *                       it is a bind address.  If the service is an {@link IoConnector},
     *                       it is a remote address.
     * @param handler the {@link IoHandler} that serves the service
     * @param config  the {@link IoServiceConfig} of the service
     */
    void serviceDeactivated(IoService service, SocketAddress serviceAddress,
            IoHandler handler, IoServiceConfig config);

    /**
     * Invoked when a new session is created by an {@link IoService}.
     * Service创建会话时调用
     * @param session the new session
     */
    void sessionCreated(IoSession session);

    /**
     * Invoked when a session is being destroyed by an {@link IoService}.
     * 当会话被Service销毁时调用
     * @param session the session to be destroyed
     */
    void sessionDestroyed(IoSession session);
}

//IoServiceStatistics
public class IoServiceStatistics
{
    private AbstractIoService service;
    private double readBytesThroughput;
    private double writtenBytesThroughput;
    private double readMessagesThroughput;
    private double writtenMessagesThroughput;
    private double largestReadBytesThroughput;
    private double largestWrittenBytesThroughput;
    private double largestReadMessagesThroughput;
    private double largestWrittenMessagesThroughput;
    private long readBytes;
    private long writtenBytes;
    private long readMessages;
    private long writtenMessages;
    private long lastReadTime;
    private long lastWriteTime;
    private long lastReadBytes;
    private long lastWrittenBytes;
    private long lastReadMessages;
    private long lastWrittenMessages;
    private long lastThroughputCalculationTime;
    private int scheduledWriteBytes;
    private int scheduledWriteMessages;
    private final AtomicInteger throughputCalculationInterval = new AtomicInteger(3);
    private final Lock throughputCalculationLock = new ReentrantLock();
    ...
}

//IoSessionDataStructureFactory
public interface IoSessionDataStructureFactory
{
    public abstract IoSessionAttributeMap getAttributeMap(IoSession iosession)
        throws Exception;
    public abstract WriteRequestQueue getWriteRequestQueue(IoSession iosession)
        throws Exception;
}

//IoServiceListenerSupport
public class IoServiceListenerSupport
{
    private final IoService service;
    private final List listeners = new CopyOnWriteArrayList();//Service监听器
    private final ConcurrentMap managedSessions = new ConcurrentHashMap();//service管理会话Map
    private final Map readOnlyManagedSessions;
    private final AtomicBoolean activated = new AtomicBoolean();
    private volatile long activationTime;
    private volatile int largestManagedSessionCount;
    private AtomicLong cumulativeManagedSessionCount;
}




//默认异常监视器
public class DefaultExceptionMonitor extends ExceptionMonitor
{
    private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/util/DefaultExceptionMonitor);
    public DefaultExceptionMonitor()
    {
    }
    public void exceptionCaught(Throwable cause)
    {
        if(cause instanceof Error)
        {
            throw (Error)cause;
        } else
        {
            LOGGER.warn("Unexpected exception.", cause);
            return;
        }
    }
}

//ExceptionMonitor
public abstract class ExceptionMonitor
{
    private static ExceptionMonitor instance = new DefaultExceptionMonitor();
    public ExceptionMonitor()
    {
    }
    public static ExceptionMonitor getInstance()
    {
        return instance;
    }
    public static void setInstance(ExceptionMonitor monitor)
    {
        if(monitor == null)
            monitor = new DefaultExceptionMonitor();
        instance = monitor;
    }
    public abstract void exceptionCaught(Throwable throwable);
}

  • 大小: 61.1 KB
0
1
分享到:
评论

相关推荐

    深入理解Mina

    2. BaseIoService:这是一个抽象类,它的目的是实现IoService接口中的一些默认功能,降低子类实现接口时的负担。BaseIoService实现了大部分的IoService接口方法,但不包括一些配置方法,如getDefaultConfig()。 3. ...

    Apache_Mina2.0学习笔记(初版).doc

    IoService是Mina的核心接口,它定义了服务端处理客户端连接、读写数据以及管理会话的主要方法。IoService的实现类如NioServerSocketAcceptor,负责监听和接受来自客户端的连接请求。 #### 2.1. 类结构 IoService...

    Apache Mina 2 完全自学手册

    IoService是Mina中的核心接口之一,它定义了网络服务的基本操作,包括启动、停止服务以及连接管理。它是IoAcceptor和IoConnector的超类型,为它们提供统一的接口。 **2. IoFilter接口** IoFilter接口用于处理数据...

    深入理解Apache_Mina_(1)----_Mina的几个类

    - **定义**:`BaseIoService` 是一个抽象类,实现了 `IoService` 接口中的一部分方法。它提供了一个基本的服务实现框架,可以被其他具体的类继承并进一步扩展。 - **功能**:`BaseIoService` 实现了 `IoService` 的...

    MINA源码分析,内涵类的讲解

    MINA的核心是`IoService`接口,它定义了服务端和客户端的基本功能。`IoAcceptor`是服务端接口,负责监听和接受新的连接,而`IoConnector`则代表客户端,用于建立到远程服务器的连接。它们都依赖于`Session`接口,...

    mina开发文档

    1. **IoService**:这是Mina的核心接口,它是所有I/O通信的起点,定义了服务的基本操作,如启动、停止、绑定端口、连接远程服务器等。`IoService`接口提供了管理会话(IoSession)和配置(IoServiceConfig)的能力。...

    MINA网络通信框架.pdf

    MINA通过IoSession接口提供了对客户端连接的抽象,可以存储客户端的状态信息,方便地识别和管理不同的客户端请求,以及实现客户端之间的通信。 以下是一个简单的MINA服务端代码示例: ```java // 初始化Acceptor,...

    Mina2.0完全剖析,完全自学手册

    - **类结构**:`IoService`是一个抽象接口,具体的实现类包括`IoAcceptor`(用于接受传入连接)和`IoConnector`(用于发起传出连接)。 - **应用**:通过继承或实现这些接口,可以定制自己的网络服务逻辑。 ##### ...

    Apache_Mina_Server中文参考手册.pdf

    3. IoFilter:定义了一组拦截器接口,可以实现诸如日志输出、黑名单过滤以及数据的编码(write方向)和解码(read方向)等功能。其中,数据的编码和解码是使用Mina进行网络通信时重点关注的部分。 4. IoHandler:该...

    使用MINA实现长连接

    2. **IoHandler:** 是MINA的核心接口,定义了处理网络事件的方法,如连接建立、数据读写、连接关闭等。 3. **IoFilter:** 过滤器允许你在数据发送到目的地或从源头接收之前对数据进行处理。它们可以用来实现认证...

    mina中文开发手册.pdf

    3. IoFilter:该接口定义了一组拦截器,可以实现包括日志输出、数据编码解码等不同功能。在使用Mina时,数据的编解码通常是最主要关注的部分。 4. IoHandler:该接口负责具体的业务逻辑,也就是接收和发送数据的...

    mina 中文参考手册 各个接口都做了详尽的描述

    1. 实现 `IoService`:首先创建 `IoAcceptor`,它是服务端和客户端的抽象。由于我们需要构建 TCP 服务器,因此选择 `NioSocketAcceptor`,它底层基于 `ServerSocketChannel`。如果使用了 Apache APR 库,可以选择 `...

    Mina2.0学习笔记

    - **应用**:可以通过实现该接口来自定义过滤器,如日志记录、加密解密等功能。 - **IoHandler接口**: - **概念**:负责处理所有I/O事件。 - **方法**:包含sessionCreated、messageReceived、exceptionCaught等...

    Mina 1.1.7核心代码(apache.mina.core)

    3. **IoHandler**: IoHandler接口定义了服务端接收到数据后需要执行的方法,如消息的接收和发送。它是用户实现业务逻辑的地方。 4. **IoService**: 这是MINA的服务组件,它管理IoSession并负责创建、管理和关闭它们...

    mina初步学习笔记

    - 实现此接口可定义如何处理会话创建、关闭、消息接收等事件。 #### 四、Mina中的长连接与短连接 - **长连接**:建立一次连接后保持连接状态,多次通信使用同一连接,适用于频繁交互的场景。 - **短连接**:每次...

    mina2.0 英文版 中文版 官方文档

    4. **协议支持**:MINA 支持自定义协议,开发者可以通过实现 `ProtocolDecoder` 和 `ProtocolEncoder` 接口来处理数据编码和解码。 5. **事件驱动**:MINA 通过 `IoEventType` 定义了一系列网络事件,如 OPEN、...

    mina2框架+实例教程

    2. **IoHandler**: IoHandler是Mina的核心接口,定义了处理网络事件的方法,如连接建立、数据读取、连接关闭等。 3. **ProtocolDecoder/ProtocolEncoder**: 这两个接口用于将接收到的原始字节流解码为应用程序可...

    Apache mina框架入门教程

    IoService是服务端和客户端的抽象接口。 2. **IoProcessor**:在另一个线程上,IoProcessor负责检查通道上的数据读写,它也有自己的Selector,并负责调用IoFilter链和IoHandler。IoProcessor的职责是执行实际的I/O...

    mina 聊天Demo

    Apache Mina提供了一种抽象层,让开发者可以专注于处理应用程序逻辑,而无需关心底层网络通信的复杂性。它支持多种协议,如TCP/IP、UDP/IP和HTTP,并且适用于Java NIO(非阻塞I/O)模型,这使得Mina在高并发环境下...

Global site tag (gtag.js) - Google Analytics