`
jiangwenfeng762
  • 浏览: 288490 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

mina源码分析——bind

阅读更多

关于Mina

mina是开源的NIO框架,其project地址:

http://mina.apache.org/mina-project/features.html

想快速了解mina就看user guide:

http://mina.apache.org/mina-project/userguide/user-guide-toc.html

mina给我的感觉:干净、利落的抽象,非常容易上手,使用mina你只需要写不需要超过10行code就可以搭建一个TCP服务器,就像mina自身带的例子:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.example.gettingstarted.timeserver.TimeServerHandler;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MinaTimeServer {

	/**
	 * @param args
	 */
	private static final int PORT = 9123;
	public static void main(String[] args) throws IOException {
		IoAcceptor acceptor = new NioSocketAcceptor();
		acceptor.getFilterChain().addLast( "logger", new LoggingFilter());
        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName( "UTF-8" ))));
        acceptor.setHandler(new TimeServerHandler());
        acceptor.getSessionConfig().setReadBufferSize(2048);
        acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10);
		acceptor.bind(new InetSocketAddress(PORT) );
	}
}

 

Mina源码分析之bind

mina好用只是它把复杂很好的进行了分解和隐藏,要想真正掌握它,还是要深入了解源码,所以从bind方法开始,bind方法是interface IoAcceptor声明的方法,先看看它的javadoc,了解bind方法的职责:

 

   /**
     * Binds to the default local address(es) and start to accept incoming
     * connections.
     *
     * @throws IOException if failed to bind
     */
    void bind() throws IOException;

    /**
     * Binds to the specified local address and start to accept incoming
     * connections.
     *
     * @param localAddress The SocketAddress to bind to
     *  
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress localAddress) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections. If no address is given, bind on the default local address.
     * 
     * @param firstLocalAddresses The first address to bind to 
     * @param addresses The SocketAddresses to bind to 
     *
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections. If no address is given, bind on the default local address.
     * 
     * @param addresses The SocketAddresses to bind to 
     *
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress... addresses) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections.
     *
     * @throws IOException if failed to bind
     */
    void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;

 bind方法职责

1.绑定到本地ip地址(如果没有指定就绑定到默认ip)

2.开始接受外面进来的请求。

通俗地说,一个Tcp的或则UDP的IO Server调用了bind方法后,就可以开始干活了,就可以接受请求了。顺便说一句,mina对IO的两端(Server,Client)都有很好的抽象和封装,IO Server由IoAcceptor代表,IO Client由IoConnector代表,而它们又都是IoService的直接子接口。关于mina的整体架构还是去看user guide,非常清晰。

 

明白了bind方法的职责,不妨先不要往下看,而是结合java NIO api 思考一下其大概的实现细节,然后再去翻code来验证你的推测。

对于一个基于NIO的server来说,要想能够开始接受外面的请求,有几件事必须做:

1.open一个SelectableChannel,SelectableChannel代表了可以支持非阻塞IO操作的channel

2.open一个Selector,Selector通过select方法来监控所有可以进行IO操作的SelectableChannel

3.完成SelectableChannel在Selector上的注册,这样Selector方可监控他。而类SelectionKey则代表一个成功的注册。

当然对于一个框架来说,肯定会做的更多,但以上3点是必须的,那接下来就去翻code吧

 

首先看bind方法所在的类层次:

bind方法所在的class位置

从类图中可以看出,真正处理bind逻辑的是bindInternal方法,该方法在AbstractIoAcceptor抽象类中声明,在AbstractPollingIoAcceptor类中实现。正如javadoc的说明,AbstractPollingIoAcceptor是A base class for implementing transport using a polling strategy. 

 

 bindInternal方法

下面是bindInternal方法的code:

 

protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
        // Create a bind request as a Future operation. When the selector
        // have handled the registration, it will signal this future.
        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);

        // adds the Registration request to the queue for the Workers
        // to handle
        registerQueue.add(request);

        // creates the Acceptor instance and has the local
        // executor kick it off.
        startupAcceptor();

        // As we just started the acceptor, we have to unblock the select()
        // in order to process the bind request we just have added to the
        // registerQueue.
        try {
            lock.acquire();

            // Wait a bit to give a chance to the Acceptor thread to do the select()
            Thread.sleep(10);
            wakeup();
        } finally {
            lock.release();
        }

        // Now, we wait until this request is completed.
        request.awaitUninterruptibly();

        if (request.getException() != null) {
            throw request.getException();
        }

        // Update the local addresses.
        // setLocalAddresses() shouldn't be called from the worker thread
        // because of deadlock.
        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

        for (H handle : boundHandles.values()) {
            newLocalAddresses.add(localAddress(handle));
        }

        return newLocalAddresses;
    }

 

1.创建一个bind请求:

AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);

当前你可以把AcceptorOperationFuture简单理解为支持异步处理和事件通知机制的一个request,后续再详细分析它。

 

2.把请求加到队列中:

 registerQueue.add(request);

registerQueue保存所有新注册的请求,这些请求在registerHandles方法中被消费掉。registerHandles方法又被Acceptor.run方法调用。新注册成功的连接都被保存在:

 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());

 

3.创建Acceptor类,并且异步执行该类的run方法。

startupAcceptor();

Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求。下面会重点分析这个类。

 

4.等待10毫秒,然后调用wakeup方法来确保selector的select方法被唤醒。

try {

            lock.acquire();

 

            // Wait a bit to give a chance to the Acceptor thread to do the select()

            Thread.sleep(10);

            wakeup();

        } finally {

            lock.release();

        }

lock是一个信号量:

private final Semaphore lock = new Semaphore(1);由于可用值是1,因此相当于排他锁。

 这里要重点解释一下为什么要sleep这10毫秒:要搞清楚这点,首先要看Acceptor.run方法的实现(下面介绍),该方法调用了

int selected = select();

而select方法是abstract的:

    /**
     * Check for acceptable connections, interrupt when at least a server is ready for accepting.
     * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
     * @return The number of sockets having got incoming client
     * @throws Exception any exception thrown by the underlying systems calls
     */
    protected abstract int select() throws Exception;

 大家能够猜到其子类:NioSocketAcceptor的实现就是调用Selector.select方法:

    @Override
    protected int select() throws Exception {
        return selector.select();
    }

 而select方法在没有io事件也没有调用selector.wakeup方法时是阻塞的,反之就会被唤醒。所以才有了上面sleep10毫秒后调用wakeup方法(也是abstract方法,子类NioSocketAcceptor实现就是调用Selector.wakeUp),来唤醒select。但由于Accecptor是在独立线程运行的,因而sleep一会以保证那个线程已经启动,而不会错过唤醒select.

 

5.然后就等待请求被处理完毕,并判断是否有异常,没有异常就把绑定的本地地址返回。

 

Acceptor

Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求 

接下来重点看Acceptor的run方法:

        public void run() {
            assert (acceptorRef.get() == this);

            int nHandles = 0;

            // Release the lock
            lock.release();

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    // The select() will be woke up if some new connection
                    // have occurred, or if the selector has been explicitly
                    // woke up
                    int selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port on which this class will
                    // listen on
                    nHandles += registerHandles();

                    // 这里推出while循环的逻辑 省略。。。

                    if (selected > 0) {
                        // We have some connection request, let's process
                        // them here.
                        processHandles(selectedHandles());
                    }

                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    break;
                } catch (Throwable e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            // Cleanup all the processors, and shutdown the acceptor.
            // .....
            }
        }

 run方法重点是while循环:

1.首先调用select方法来看是否有新io请求进来(由于Acceptor只被bind和unbind调用,因此此时的请求只可能是bind请求和unbind请求)

2.调用registerHandles处理bind请求,该方法返回新处理的bind请求数目,如果=0,则推出循环,Acceptor的处理结束。处理bind请求的大概过程是:

1)从registerQueue队列中拿出bind请求:AcceptorOperationFuture(之前放进去的)

2)从AcceptorOperationFuture中拿出要bind的SocketAddress(可能不止一个);

3)对于每一个SocketAddress,调用open方法来建立ServerSocketChannel通道。open方法是abstract,其子类NioSocketAcceptor给出了实现,没错正如我们之前设想的那样,NioSocketAcceptor.open主要就是完成ServerSocketChannel的注册,源码如下:

    @Override
    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
        // Creates the listening ServerSocket
        ServerSocketChannel channel = ServerSocketChannel.open();

        boolean success = false;

        try {
            // This is a non blocking socket channel
            channel.configureBlocking(false);

            // Configure the server socket,
            ServerSocket socket = channel.socket();

            // Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());

            // and bind.
            socket.bind(localAddress, getBacklog());

            // Register the channel within the selector for ACCEPT event
            channel.register(selector, SelectionKey.OP_ACCEPT);
            success = true;
        } finally {
            if (!success) {
                close(channel);
            }
        }
        return channel;
    }

 4)把第3)建立好的ServerSocketChannel放到boundHandles中。boundHandles是一个线程安全的map:

 

private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());

5)返回新注册的size。

 

3. 如果select方法返回值>0,就调用processHandles(selectedHandles());来处理。处理过程大致如下:

1)从selector中拿出有io发生的ServerSocketChannel集合;这个过程是在子类NioSocketAcceptor .selectedHandles方法中发生的。

2)对于每一个ServerSocketChannel,调用accept方法来建立会话:IoSession,IoSession是mina中非常重要的概念,以后再分析。accept方法也是abstract的,其子类NioSocketAccepor给出了实现:

    @Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {

        SelectionKey key = handle.keyFor(selector);

        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }

        // accept the connection from the client
        SocketChannel ch = handle.accept();

        if (ch == null) {
            return null;
        }

        return new NioSocketSession(this, processor, ch);
    }

3)初始化session:initSession(session, null, null);

4)保存session到IoProcessor

 

4.处理unbind的逻辑:

nHandles -= unregisterHandles();

前面说过Acceptor的用户是bind和unbind,所以run方法里自然会有处理unbind的逻辑,不了解这一点看到这段code可能会有些困惑。

unbind的逻辑就很简单了,大致过程如下:

1)从cancelQueue中取出unbind的请求:AcceptorOperationFuture

2)从AcceptorOperationFuture中取出SocketAddress对象(可能不止一个)

3)对于每一个SocketAddress,把它从boundHandles中删除

4)对于每一个SocketAddress,调用close方法来关闭它,close方法也是abstract方法,子类NioSocketAcceptor给出了实现:

    @Override
    protected void close(ServerSocketChannel handle) throws Exception {
        SelectionKey key = handle.keyFor(selector);

        if (key != null) {
            key.cancel();
        }

        handle.close();
    }

 5)调用AcceptorOperationFuture.setDone来做事件通知

 

AbstractPollingIoAcceptor.bindInternal

->startupAcceptor//启动Acceptor线程

-->Acceptor.run

--->registerHandles

主要逻辑:完成ServerSocketChannel在selector中的注册,感兴趣的事件是: SelectionKey.OP_ACCEPT,注册后ServerSocketChannel就可以在指定的ip和port上监听新连接。

1)从registerQueue中取出头元素,registerQueue保存待注册的请求

AcceptorOperationFuture future = registerQueue.poll();

1)创建ServerSocketChannel:ServerSocketChannel.open();

2)把ServerSocket绑定到指定的SocketAddress,SocketAddress来自

List<SocketAddress> localAddresses = future.getLocalAddresses();

3)把ServerSocketChannel注册到selector:channel.register(selector, SelectionKey.OP_ACCEPT);

4)返回channel并保存在boundHandles中

--->processHandles

主要逻辑:处理ServerSocketChannel上的新连接请求,把新连接请求构建新的IoSession并初始化,然后把IoSession实例交给IoProcessor来管理和处理io操作。

1)调用NioSocketAcceptor.accept方法返回NioSocketSession实例。构建NioSocketSession需要SocketChannel、IoProcessor等参数。

2)调用initSession方法

3)把当前session加到IoProcessor中:void add(S session)

--->unregisterHandles

 

主要逻辑:响应doUnbind方法,关闭ServerSocketChannel

  • 大小: 16.7 KB
分享到:
评论

相关推荐

    Mina2源码分析

    ### Mina2源码分析——核心模块解析 #### 概述 Mina2是一个高性能、可扩展的网络应用框架,支持多种传输协议如TCP、UDP等,并提供了丰富的API供开发者使用。本文旨在深入剖析Mina2的核心部分,帮助读者更好地理解和...

    MINA源码分析,内涵类的讲解

    在这个“MINA源码分析,内涵类的讲解”中,我们将深入探讨MINA的核心组件和设计模式,以便更好地理解和利用这个强大的框架。 首先,我们需要了解MINA的基础架构。MINA的核心是`IoService`接口,它定义了服务端和...

    Mina源码解析

    在深入解析Mina源码的过程中,我们可以从以下几个关键知识点入手: 1. **Mina的过滤器机制实现**: Mina的核心设计理念之一是过滤器链(Filter Chain),它借鉴了Servlet的过滤器模型。每个过滤器都可以在数据传输...

    apache-mina源码

    通过阅读和分析`apache-mina-2.0.16`的源码,我们可以深入理解MINA的设计思想,学习如何构建高效的网络服务,并能根据自己的需求定制和扩展MINA的功能。对于想要从事网络编程或系统架构设计的开发者来说,研究MINA...

    MINA源码与例子

    在MINA源码中,我们可以看到以下关键组件: 1. **Session**:这是MINA中的核心概念,代表了网络连接。Session负责管理连接状态,并提供读写数据的方法。 2. **Filter Chain**:MINA采用过滤器链模式来处理网络事件...

    mina 源码

    在分析"apache-mina-2.0.4"源码时,我们可以深入理解其设计理念、架构以及核心组件。 1. **MINA的设计理念** MINA的核心理念是异步非阻塞I/O,它利用Java NIO(New I/O)库来实现高效的网络通信。通过非阻塞I/O,...

    mina2.0源码svn地址

    ### mina2.0源码svn地址解析与详细介绍 #### 一、Mina2.0简介 Mina(Multi-threaded IO Network Architecture)是Apache软件基金会的一个开源项目,旨在为开发人员提供一个易于使用的高性能网络应用程序框架。Mina...

    Mina2源码分析.docx

    Mina2源码分析 Mina2框架源码剖析是 Java 编程语言中一个流行的网络编程框架,用于构建高性能、可扩展的网络应用程序。该框架核心包包括 org.apache.mina.core.service, org.apache.mina.core.session, org.apache....

    Mina2源码分析.doc

    `bind()`和`unbind()`方法分别用于启动和停止连接接受。 `IoConnector`与`IoAcceptor`对应,是客户端连接发起者。它尝试连接到服务器,并在连接成功、失败或取消时触发事件。`connect()`方法启动连接尝试,返回`...

    mina源码+例子mina-2.0.0-M6.zip

    2. **分析源码**:通过阅读MINA的源码,了解其内部处理流程,如读写事件的处理、过滤器链的工作方式等。 3. **编写简单的应用**:从例子入手,创建一个简单的MINA服务器和客户端,理解它们之间的通信机制。 4. **...

    mina2源码分析

    ### Mina2源码分析:深入理解IoService与IoProcessor 在探讨Mina框架的核心机制时,我们聚焦于`IoService`与`IoProcessor`,它们是Mina框架设计的灵魂,为实现高效、灵活的网络通信提供了坚实的基石。 #### ...

    mina学习资料与源码分析

    这个压缩包"mina学习资料与源码分析"包含了帮助你快速入门Mina、深入理解其工作原理和API的资源。下面,我们将深入探讨Mina的核心概念、功能以及如何利用它来开发网络应用。 Mina源于Java社区,其全称为“Minimal ...

    Java mina2源码

    6. **Nio**与**Aio**:Mina2支持两种不同的I/O模型——NIO(非阻塞I/O)和AIO(异步I/O)。NIO适用于高并发场景,而AIO在低延迟和高吞吐量的应用中表现出色。 通过深入研究Mina2的源码,我们可以了解到如何优化网络...

    Mina2.0框架源码剖析

    通过深入分析Mina2.0的源码,我们可以了解到其内部的事件驱动模型、网络通信机制以及如何通过过滤器和处理器来处理网络请求。这有助于我们更好地理解和利用Mina框架来开发高并发、高性能的网络应用。

    Mina 框架源码解析-构建简单通信程序

    通过分析源码,我们可以看到Mina如何优雅地将网络编程的复杂性封装起来,让开发者可以更专注于应用的业务层面。同时,通过心跳机制,我们能够实时检测网络状况,确保服务的稳定性和可靠性。 总的来说,Mina框架提供...

    MINA 2.0.9源码

    此外,源码分析还能帮助你掌握如何自定义Filter和Codec,以满足特定的应用场景需求。 总之,MINA 2.0.9源码为开发者提供了一个深入学习网络编程和异步I/O的好资源,通过研究和实践,开发者可以提升自己的网络应用...

    mina源码走读与实例

    ### MINA源码走读与实例 #### 一、MINA概述 **MINA**(**M**ulti **I**nterface **N**etwork **A**pplication)是Apache组织下的一款开源网络通信框架,它主要针对TCP/IP、UDP/IP协议栈提供了高效的封装和扩展能力...

    mina新手教程源码 mina+springboot+idea最简单的案例。

    mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...

    JAVA mina 框架源码

    通过对Mina源码的学习,开发者不仅能理解其设计理念,还能掌握如何优化网络通信应用,提高系统的稳定性和性能。此外,学习Mina也能帮助开发者更好地理解Java NIO和网络编程的底层原理,对于提升专业技能非常有帮助。

    Mina 2.0快速入门与源码解析

    为了帮助读者快速理解如何使用 Mina 2.0 开发简单的网络服务,下面提供了一个基于时间服务器的例子——`MinaTimeServer.java`。 ```java package com.vista; import java.io.IOException; import java.net....

Global site tag (gtag.js) - Google Analytics