`
Donald_Draper
  • 浏览: 980849 次
社区版块
存档分类
最新评论

Mina 连接器接口定义及抽象实现(IoConnector )

    博客分类:
  • Mina
阅读更多
Mina IoService接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378271
Mina socket监听器(NioSocketAcceptor):http://donald-draper.iteye.com/blog/2378668
引言:
前面一篇文章我们可以socket监听,先来回顾一下:
    socket监听NioSocketAcceptor,有两个内部变量为选择器selector和选择器提供者selectorProvider。init方法主要工作为打开一个选择器selector。打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。选择唤醒操作实际委托给内部选择器。
从这篇文章开始我们来讲socket连接器NioSocketConnector,先从Io连接器接口定义开始:
/**
 * Connects to endpoint, communicates with the server, and fires events to
 * {@link IoHandler}s.
 连接器IoConnector,可以连接终端与服务通信,触发IoHandler的相关事件。
 * <p>
 * Please refer to
 * [url=../../../../../xref-examples/org/apache/mina/examples/netcat/Main.html]NetCat[/url]
 * example.
 * <p>
 * You should connect to the desired socket address to start communication,
 * and then events for incoming connections will be sent to the specified
 * default {@link IoHandler}.
 在开始通信之前,你应该连接一个Socket地址,建立连接的相关事件将会发送到IoHandler。
 * <p>
 * Threads connect to endpoint start automatically when
 * {@link #connect(SocketAddress)} is invoked, and stop when all
 * connection attempts are finished.
 *当连接远端地址时,线程自动连接远端socket地址,当连接尝试完成时,线程关闭
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public interface IoConnector extends IoService {
    /**
     * @return the connect timeout in seconds.  The default value is 1 minute.
     * 获取连接超时时间(s),默认为1分钟
     * @deprecated
     */
    @Deprecated
    int getConnectTimeout();
    /**获取连接超时时间(ms),默认为1分钟
     * @return the connect timeout in milliseconds.  The default value is 1 minute.
     */
    long getConnectTimeoutMillis();
    /**
     * Sets the connect timeout in seconds.  The default value is 1 minute.
     * 设置连接超时时间,单位s
     * @deprecated
     * @param connectTimeout The time out for the connection
     */
    @Deprecated
    void setConnectTimeout(int connectTimeout);
    /**
     * Sets the connect timeout in milliseconds.  The default value is 1 minute.
     * 设置连接超时时间,单位ms
     * @param connectTimeoutInMillis The time out for the connection
     */
    void setConnectTimeoutMillis(long connectTimeoutInMillis);

    /**
     * @return the default remote address to connect to when no argument
     * is specified in {@link #connect()} method.
     获取默认的远程socket地址
     */
    SocketAddress getDefaultRemoteAddress();

    /**
     * Sets the default remote address to connect to when no argument is
     * specified in {@link #connect()} method.
     * 设置默认的远端socket地址
     * @param defaultRemoteAddress The default remote address
     */
    void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);
    //设置获取本地默认socket地址
    /**
     * @return the default local address
     */
    SocketAddress getDefaultLocalAddress();
    /**
     * Sets the default local address
     * 
     * @param defaultLocalAddress The default local address
     */
    void setDefaultLocalAddress(SocketAddress defaultLocalAddress);

    /**
     * Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
     * remote address}.
     * 连接到默认远端地址
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     * @throws IllegalStateException
     *             if no default remoted address is set.
     */
    ConnectFuture connect();

    /**
     * Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
     * remote address} and invokes the <code>ioSessionInitializer</code> when
     * the IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
     * is invoked.  There is [i]no[/i] guarantee that the <code>ioSessionInitializer</code>
     * will be invoked before this method returns.
     * 在连接默认远端地址,当会话创建时,在IoHandler#sessionCreated调用前,调用ioSessionInitializer
     ,初始化会话
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     * 
     * @throws IllegalStateException if no default remote address is set.
     */
    ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    /**
     * Connects to the specified remote address.
     * 连接到远端socket地址
     * @param remoteAddress The remote address to connect to
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress);

    /**
     * Connects to the specified remote address and invokes
     * the <code>ioSessionInitializer</code> when the IoSession is created but before
     * {@link IoHandler#sessionCreated(IoSession)} is invoked.  There is [i]no[/i]
     * guarantee that the <code>ioSessionInitializer</code> will be invoked before
     * this method returns.
     * 连接远端地址,在会话创建时,在IoHandler#sessionCreated调用前,调用ioSessionInitializer
     ,初始化会话。
     * @param remoteAddress  the remote address to connect to
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    /**
     * Connects to the specified remote address binding to the specified local address.
     *连接远端地址,绑定本地地址
     * @param remoteAddress The remote address to connect
     * @param localAddress The local address to bind
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    /**
     * Connects to the specified remote address binding to the specified local
     * address and and invokes the <code>ioSessionInitializer</code> when the
     * IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
     * is invoked.  There is [i]no[/i] guarantee that the <code>ioSessionInitializer</code>
     * will be invoked before this method returns.
     * 连接远端地址,绑定本地地址,在IoHandler#sessionCreated调用前,调用ioSessionInitializer
     ,初始化会话。
     * @param remoteAddress  the remote address to connect to
     * @param localAddress  the local interface to bind to
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     *
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
            IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
}

从IoConnector接口定义来看,与Ioservice相比增加了连接功能。
再来看抽象连接器AbstractIoConnector定义:
/**
 * A base implementation of {@link IoConnector}.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public abstract class AbstractIoConnector extends AbstractIoService implements IoConnector {
    /**
     * The minimum timeout value that is supported (in milliseconds).
     */
    private long connectTimeoutCheckInterval = 50L;//连接超时检查间隔
    private long connectTimeoutInMillis = 60 * 1000L; // 1 minute by default,默认连接超时时间
    /** The remote address we are connected to 连接的远端地址*/
    private SocketAddress defaultRemoteAddress;
    /** The local address 本地socket地址*/
    private SocketAddress defaultLocalAddress;
}

再看看构造:
/**
 * Constructor for {@link AbstractIoConnector}. You need to provide a
 * default session configuration and an {@link Executor} for handling I/O
 * events. If null {@link Executor} is provided, a default one will be
 * created using {@link Executors#newCachedThreadPool()}.
 * 构造AbstractIoConnector,需要提供一个会话配置,一个执行器用于处理IO相关事件,
 如果执行器为空,默认为Executors#newCachedThreadPool
 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
 * 
 * @param sessionConfig
 *            the default configuration for the managed {@link IoSession}
 * @param executor
 *            the {@link Executor} used for handling execution of I/O
 *            events. Can be <code>null</code>.
 */
protected AbstractIoConnector(IoSessionConfig sessionConfig, Executor executor) {
    super(sessionConfig, executor);
}

再来看连接操作:
/**
 * {@inheritDoc}
 */
@Override
public final ConnectFuture connect() {
    SocketAddress remoteAddress = getDefaultRemoteAddress();
    if (remoteAddress == null) {
        throw new IllegalStateException("defaultRemoteAddress is not set.");
    }
    return connect(remoteAddress, null, null);
}
/**
 * {@inheritDoc}
 */
@Override
public ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    SocketAddress remoteAddress = getDefaultRemoteAddress();
    if (remoteAddress == null) {
        throw new IllegalStateException("defaultRemoteAddress is not set.");
    }
    return connect(remoteAddress, null, sessionInitializer);
}
/**
 * {@inheritDoc}
 */
@Override
public final ConnectFuture connect(SocketAddress remoteAddress) {
    return connect(remoteAddress, null, null);
}
/**
 * {@inheritDoc}
 */
@Override
public ConnectFuture connect(SocketAddress remoteAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    return connect(remoteAddress, null, sessionInitializer);
}
/**
 * {@inheritDoc}
 */
@Override
public ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return connect(remoteAddress, localAddress, null);
}

上面所有的连接操作都是委托给方法connect(SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer)
我们来看这个方法:
/**
 * {@inheritDoc}
 */
@Override
public final ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    //检查连接器状态,检查本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配
    if (isDisposing()) {
        throw new IllegalStateException("The connector is being disposed.");
    }
    if (remoteAddress == null) {
        throw new IllegalArgumentException("remoteAddress");
    }
    if (!getTransportMetadata().getAddressType().isAssignableFrom(remoteAddress.getClass())) {
        throw new IllegalArgumentException("remoteAddress type: " + remoteAddress.getClass() + " (expected: "
                + getTransportMetadata().getAddressType() + ")");
    }
    if (localAddress != null && !getTransportMetadata().getAddressType().isAssignableFrom(localAddress.getClass())) {
        throw new IllegalArgumentException("localAddress type: " + localAddress.getClass() + " (expected: "
                + getTransportMetadata().getAddressType() + ")");
    }
    //如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler
    if (getHandler() == null) {
        if (getSessionConfig().isUseReadOperation()) {
            setHandler(new IoHandler() {
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void messageReceived(IoSession session, Object message) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void messageSent(IoSession session, Object message) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionClosed(IoSession session) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionCreated(IoSession session) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionOpened(IoSession session) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void inputClosed(IoSession session) throws Exception {
                    // Empty handler
                }
            });
        } else {
            throw new IllegalStateException("handler is not set.");
        }
    }
    return connect0(remoteAddress, localAddress, sessionInitializer);
}


/**
 * Implement this method to perform the actual connect operation.
 *实现具体的connect0
 * @param remoteAddress The remote address to connect from
 * @param localAddress <tt>null</tt> if no local address is specified
 * @param sessionInitializer The IoSessionInitializer to use when the connection s successful
 * @return The ConnectFuture associated with this asynchronous operation
 * 
 */
protected abstract ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

从上面可以看出,连接操作,首先检查连接器状态,本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配,如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler,最后将实际连接操作委托给connect0,待子类实现。
再来看其他方法:
 /**
     * Adds required internal attributes and {@link IoFutureListener}s
     * related with event notifications to the specified {@code session}
     * and {@code future}.  Do not call this method directly;
     添加必要的内部属性和结果监听器到会话和future,不要直接调用此方法。
     */
    @Override
    protected final void finishSessionInitialization0(final IoSession session, IoFuture future) {
        // In case that ConnectFuture.cancel() is invoked before
        // setSession() is invoked, add a listener that closes the
        // connection immediately on cancellation.
	//防止在设置会话时,连接请求被取消,添加一个监听器,当连接取消时,
	//关闭会话。
        future.addListener(new IoFutureListener<ConnectFuture>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public void operationComplete(ConnectFuture future) {
                if (future.isCanceled()) {
                    session.closeNow();
                }
            }
        });
    }
    /**
    * @return
     *  The minimum time that this connector can have for a connection
     *  timeout in milliseconds.
     */
    public long getConnectTimeoutCheckInterval() {
        return connectTimeoutCheckInterval;
    }
    /**
     * Sets the timeout for the connection check
     *  
     * @param minimumConnectTimeout The delay we wait before checking the connection
     */
    public void setConnectTimeoutCheckInterval(long minimumConnectTimeout) {
        if (getConnectTimeoutMillis() < minimumConnectTimeout) {
            this.connectTimeoutInMillis = minimumConnectTimeout;
        }
        this.connectTimeoutCheckInterval = minimumConnectTimeout;
    }
    /**
     * @deprecated Take a look at <tt>getConnectTimeoutMillis()</tt>
     */
    @Deprecated
    @Override
    public final int getConnectTimeout() {
        return (int) connectTimeoutInMillis / 1000;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final long getConnectTimeoutMillis() {
        return connectTimeoutInMillis;
    }
    /**
     * @deprecated
     *  Take a look at <tt>setConnectTimeoutMillis(long)</tt>
     */
    @Deprecated
    @Override
    public final void setConnectTimeout(int connectTimeout) {

        setConnectTimeoutMillis(connectTimeout * 1000L);
    }
    /**
     * Sets the connect timeout value in milliseconds.
     * 
     */
    @Override
    public final void setConnectTimeoutMillis(long connectTimeoutInMillis) {
        if (connectTimeoutInMillis <= connectTimeoutCheckInterval) {
            this.connectTimeoutCheckInterval = connectTimeoutInMillis;
        }
        this.connectTimeoutInMillis = connectTimeoutInMillis;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public SocketAddress getDefaultRemoteAddress() {
        return defaultRemoteAddress;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setDefaultLocalAddress(SocketAddress localAddress) {
        defaultLocalAddress = localAddress;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final SocketAddress getDefaultLocalAddress() {
        return defaultLocalAddress;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress) {
        if (defaultRemoteAddress == null) {
            throw new IllegalArgumentException("defaultRemoteAddress");
        }

        if (!getTransportMetadata().getAddressType().isAssignableFrom(defaultRemoteAddress.getClass())) {
            throw new IllegalArgumentException("defaultRemoteAddress type: " + defaultRemoteAddress.getClass()
                    + " (expected: " + getTransportMetadata().getAddressType() + ")");
        }
        this.defaultRemoteAddress = defaultRemoteAddress;
    }

总结:
IoConnector接口给Ioservice增加了连接功能,可以连接服务端。连接操作,首先检查连接器状态,本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配,如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler,最后将实际连接操作委托给connect0,待子类实现。
附:
//IoSessionInitializer
public interface IoSessionInitializer
{
    public abstract void initializeSession(IoSession iosession, IoFuture iofuture);
}
0
1
分享到:
评论

相关推荐

    apache-mina-2.0.4.rar_apache mina_mina

    6. **Transport Layer**:Mina支持多种传输层实现,如TCP、UDP等,这些都抽象为IoAcceptor和IoConnector接口,方便开发者使用。 深入研究源码,你可能会关注以下方面: - **mina-core**模块:这是Mina的核心库,...

    Mina客户端示例

    1. **初始化IoConnector**: IoConnector是Mina提供的接口,用于创建和管理到远程服务器的连接。你可以选择TCP或UDP连接器,根据你的需求来决定。 2. **配置Connector**: 设置连接器的参数,如端口号、超时时间、...

    Mina官方教程_中文版.rar

    1. 创建IoAcceptor或IoConnector:根据需求创建服务器端的接受器或客户端的连接器。 2. 配置Filter链:设置过滤器,以处理数据传输过程中的各种任务,如压缩、加密、身份验证等。 3. 实现IoHandler:定义事件处理器...

    MINA源码分析,内涵类的讲解

    在MINA的源码中,`AbstractIoSession`是`Session`接口的主要实现,它管理着连接的状态、缓冲区、属性等。`DefaultSessionConfig`则提供了配置会话参数的接口,如读写超时、缓冲区大小等。 `Buffer`类是MINA用于存储...

    mina开发文档

    1. **IoService**:这是Mina的核心接口,它是所有I/O通信的起点,定义了服务的基本操作,如启动、停止、绑定端口、连接远程服务器等。`IoService`接口提供了管理会话(IoSession)和配置(IoServiceConfig)的能力。...

    Apache_Mina2.0学习笔记(初版).doc

    IoService是Mina的核心接口,它定义了服务端处理客户端连接、读写数据以及管理会话的主要方法。IoService的实现类如NioServerSocketAcceptor,负责监听和接受来自客户端的连接请求。 #### 2.1. 类结构 IoService...

    mina即时聊天demo

    开发者需要实现IoHandler接口,定义相应事件的处理方法。 4. **mina核心组件** - **Session**: 代表客户端和服务端之间的连接,包含了与连接相关的状态和配置信息。 - **Filter链**: Mina采用过滤器链设计模式,...

    Mina源码解析

    Mina框架主要由两个核心模块组成:mina-core负责基本的网络I/O操作,而mina-filter则包含了一系列预定义的过滤器,如日志记录、压缩、加密等。开发者可以根据需要选择或组合使用这些过滤器,以增强应用的功能。 ...

    mina之间通讯所需要的jar包

    为了利用这些jar包进行MINA通信,你需要创建IoAcceptor来监听客户端的连接请求,定义过滤器链来处理数据,然后编写业务逻辑处理接收到的数据。同时,客户端也需要创建IoConnector来连接服务器,并设置相应的过滤器链...

    一个Apache MINA使用案例源代码ApacheMina

    MINA提供了一种灵活的方式,可以根据实际需求定义自定义的编解码器。 3. **FilterChain**: 过滤器链是MINA中实现业务逻辑的关键机制。开发者可以通过添加自定义过滤器来处理数据流,如日志记录、性能监控、身份验证...

    mina服务器实例

    1. **Android客户端**:在Android应用中,可以使用Mina库实现与服务器的通信,创建IoConnector对象,设置过滤器链和处理器,并连接到服务器。 2. **Web端集成**:在Web端,通常通过JavaScript的WebSocket API与Mina...

    mina2框架+实例教程

    2. **IoHandler**: IoHandler是Mina的核心接口,定义了处理网络事件的方法,如连接建立、数据读取、连接关闭等。 3. **ProtocolDecoder/ProtocolEncoder**: 这两个接口用于将接收到的原始字节流解码为应用程序可...

    mina学习笔记,记录所有API

    5. **IoService / IoAcceptor / IoConnector**: IoService是服务端和客户端的抽象,IoAcceptor负责监听和接受客户端连接,IoConnector则用于连接到远程服务器。 6. **Event-driven模型**: MINA基于事件驱动模型,当...

    Mina2.0完全剖析,完全自学手册

    - **类结构**:`IoService`是一个抽象接口,具体的实现类包括`IoAcceptor`(用于接受传入连接)和`IoConnector`(用于发起传出连接)。 - **应用**:通过继承或实现这些接口,可以定制自己的网络服务逻辑。 ##### ...

    mina框架的demo 入门,开发

    1. **IoSession**:这是Mina中的核心接口,代表一个连接会话。它包含了连接的状态信息,如读写缓冲区、会话属性以及传输数据的相关方法。 2. **Filter Chain**:Mina使用过滤器链来处理进来的消息。过滤器可以对...

    mina 所需jar包

    在使用Apache MINA时,你需要配置IoService(如IoAcceptor或IoConnector),选择适当的TransportType(如NioSocketAcceptor for TCP或NioDatagramAcceptor for UDP),然后添加所需的过滤器,最后启动服务监听客户端...

    mina-2.0.19官方相关jar包

    3. **事件驱动**:MINA采用事件驱动的设计,通过事件监听器机制来处理网络事件,如连接建立、数据接收、断开连接等,使得代码更加简洁和易于维护。 4. **过滤器链**:MINA的过滤器链机制允许开发者插入自定义的过滤...

Global site tag (gtag.js) - Google Analytics