`
uniseraph
  • 浏览: 83420 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

【mina指南】mina中的reactor模式(一)

阅读更多
mina中的reactor模式实现参考了Doug Lea 在《Scalable IO in Java》中的reactor。




从上面来两个图可以看出:与传统的单个Reactor模式实现不同,mina中采用了Multiple Reactor的方式。NioSocketAcceptor和NioProcessor使用不同selector,能够更加充分的榨取服务器的性能。
acctptor主要负责
1. 绑定一个/多个端口,开始监听
2. 处理客户端的建链请求
3. 关闭一个/多个监听端口
processor主要负责
1. 接受客户端发送的数据,并转发给业务逻辑成处理
2. 发送数据到客户端

首先看一个最简单的mina服务端程序,
  
        SocketAcceptor acceptor = new NioSocketAcceptor();
        // 设定一个事件处理器
        acceptor.setHandler(new EchoProtocolHandler());
        // 绑定一个监听端口
        acceptor.bind(new InetSocketAddress(PORT));

就这3句话一个服务端程序就ok了,让我们看看,mina在背后做了点什么。
1、NioSocketAcceptor的初始化
1.1 在NioSocketAcceptor的构造函数中,把NioProcessor也构造出来了
  
  protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
            Class<? extends IoProcessor<T>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
    }

1.2 SimpleIoProcessorPool是一个NioProcessor池,默认大小是cpu个数+1,这样能够充分利用多核的cpu
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
    public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
        this(processorType, null, DEFAULT_SIZE);
    }

1.3 调用init方法,准备selector
   
     protected void init() throws Exception {
        selector = Selector.open();
    }


到此,生产线已经装配好,就等按下开关,就可以正常运行了。


2.绑定端口
2.1 在调用NioSocketAcceptor.bind()函数的时候最终调用AbstractPollingIoAcceptor.bind0,这是NioSocketAcceptor的关键所在
  
 protected final Set<SocketAddress> bind0(
        List<? extends SocketAddress> localAddresses) throws Exception {
        
        /*2.1.1registerQueue是一个待监听动作列表,每次要新绑定一个端口,增加一个register动作,在工作者线程中处理*/
        AcceptorOperationFuture request = new AcceptorOperationFuture(
                localAddresses);
        registerQueue.add(request);
 
        /*2.1.2 创建并启动工作者线程*/
        startupWorker();
        wakeup();
        //堵塞直至监听成功
        request.awaitUninterruptibly();
        
        //以下省略
      }

2.1.1 通过实现生产者/消费者问题来处理绑定端口开始监听动作,其中生产者是bind动作,消费者在Work.run方法中,registQueue是消息队列

2.1.2 在线程池中启动工作者线程

2.1.3 堵塞直至绑定监听端口成功
   
public class DefaultIoFuture implements IoFuture {
public IoFuture awaitUninterruptibly() {
        synchronized (lock) {
            while (!ready) {//当ready为true时候,跳出循环
                waiters++;
                try {
                    lock.wait(DEAD_LOCK_CHECK_INTERVAL);
                } catch (InterruptedException ie) {
             // Do nothing : this catch is just mandatory by contract
                } finally {
                    waiters--;
                    if (!ready) {
                        checkDeadLock();
                    }
                }
            }
        }

        return this;
    }
}

真正的绑定端口开始监听动作是在Woker线程中执行的
取消绑定操作与绑定操作类似,暂时先不描述。

3.AbstractPollingIoAcceptor的工作者线程是NioSocketAcceptor的核心所在,完成了以下三个主要功能:
1、处理绑定监听端口请求
2、处理取消监听端口绑定请求
3、处理socket连接请求

1,2主要是在系统初始化或者系统关闭的时候在registerQuerue/cancelQueue中增加一个消息,3是系统运行时NioSocketAcceptor处理socket建链请求的关键。1,2,3的主要内容在工作者线程Work.run方法中。

NioSocketAcceptor继承自父类AbstractPollingIoAcceptor的Worker.run
  private class Worker implements Runnable {
        public void run() {
            int nHandles = 0;

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    boolean selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port in which this class will
                    // listen on
                    nHandles += registerHandles();

                    if (selected) {
                        processHandles(selectedHandles());
                    }

                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();




                    if (nHandles == 0) {
                        synchronized (lock) {
                            if (registerQueue.isEmpty()
                                    && cancelQueue.isEmpty()) {
                                worker = null;
                                break;
                            }
                        }
                    }
                } catch (Throwable e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            if (selectable && isDisposing()) {
                selectable = false;
                try {
                    if (createdProcessor) {
                        processor.dispose();
                    }
                } finally {
                    try {
                        synchronized (disposalLock) {
                            if (isDisposing()) {
                                destroy();
                            }
                        }
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    } finally {
                        disposalFuture.setDone();
                    }
                }
            }
        }



3.1 首先系统在一个无限循环中不停的允许,处理socket建链请求/端口监听/取消端口监听请求,selectable是一个标志位,初始化的时候置为true,要关闭时候置为false,则系统退出循环。

3.2 使用selector监听是否有连接请求操作。
   public void run() {
            int nHandles = 0;

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    boolean selected = select();
                    //以下省略
             
        }

注意,虽然 boolean selected = select();是一个堵塞操作,但是run方法不会陷入死循环。因为即使没有新的连接请求到达,但是每次bind/unbind都会调用NioSocketAccepto.wakeup唤醒处于select状态的selctor。
    protected void wakeup() {
        selector.wakeup();
    }

而系统退出是,关闭的acceptor的dispose方法最终会调用unbind,所以退出时不会有问题。

3.2 从registerQueue中获取绑定请求消息,开始绑定某个端口,并开始监听,准备相关上下文信息
  public void run() {
            int nHandles = 0;

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    boolean selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port in which this class will
                    // listen on
                    nHandles += registerHandles();

                    //以下省略
        }

对于registerHandleres真正完成了对端口的监听
 private int registerHandles() {
        for (;;) {
            AcceptorOperationFuture future = registerQueue.poll();
            if (future == null) {
                return 0;
            }

            Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
            List<SocketAddress> localAddresses = future.getLocalAddresses();

            try {
                for (SocketAddress a : localAddresses) {
                    H handle = open(a);
                    newHandles.put(localAddress(handle), handle);
                }

                boundHandles.putAll(newHandles);

                // and notify.
                future.setDone();
                return newHandles.size();
            } catch (Exception e) {
                future.setException(e);
            } finally {
                // Roll back if failed to bind all addresses.
                if (future.getException() != null) {
                    for (H handle : newHandles.values()) {
                        try {
                            close(handle);
                        } catch (Exception e) {
                            ExceptionMonitor.getInstance().exceptionCaught(e);
                        }
                    }
                    wakeup();
                }
            }
        }
    }

3.2.1 registerHandlers将registerQueue中的所有绑定监听消息取出来,在循环中处理,注意一个细节是,一个绑定监听消息AcceptorOperationFuture可能要绑定监听多个ip的同一个端口
3.2.2 registerHandlers的一个关键动作是
 H handle = open(a);

参看NioSocketAcceptor.open方法的实现,发现这就是关键
    protected ServerSocketChannel open(SocketAddress localAddress)
            throws Exception {
        ServerSocketChannel c = ServerSocketChannel.open();
        boolean success = false;
        try {
            c.configureBlocking(false);
            // Configure the server socket,
            c.socket().setReuseAddress(isReuseAddress());
            // XXX: Do we need to provide this property? (I think we need to remove it.)
            c.socket().setReceiveBufferSize(
                    getSessionConfig().getReceiveBufferSize());
            // and bind.
            c.socket().bind(localAddress, getBacklog());
            c.register(selector, SelectionKey.OP_ACCEPT);
            success = true;
        } finally {
            if (!success) {
                close(c);
            }
        }
        return c;
    }


在这个方法中真正开始监听一个端口,并设置相关细节
1.采用非堵塞方式
2.可从用端口
3.socket读取的缓冲区大小
4.listen队列的长度
并且在selector中注册了对SelectionKey.OP_ACCEPT的关注。

3.2.3 通过  future.setDone();唤醒bind线程,告知绑定操作已经成功,否则bind线程还将继续堵塞。

3.3 如果发现有连接请求过来,则处理之
  private class Worker implements Runnable {
        public void run() {
            int nHandles = 0;

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    boolean selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port in which this class will
                    // listen on
                    nHandles += registerHandles();

                    if (selected) {
                        processHandles(selectedHandles());
                    }

                 //以下省略

processHandles是负责完成新建一个连接,并将这个连接交给NioProcessor监控
    private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();

                T session = accept(processor, handle);
                if (session == null) {
                    break;
                }

                finishSessionInitialization(session, null, null);

                // add the session to the SocketIoProcessor
                session.getProcessor().add(session);
            }
        }

3.3.1 accept方法创建了一个NioSocketSession,
3.3.2 finishSessionInitialization对这个session进行初始化
3.3.3 将session交给NioProcessor管理,这里有两个地方需要注意
3.3.3.1 processor是一个SimpleIoProcessorPool,里面有个多个NioProcessor,SimpleIoProcessor将轮询取一个processor负责管理新建的NioSocketSession
3.3.3.2 NioProcessor.add方法只是将NioSocketSession放到一个newSessions的队列中,并启动NioProcessor的工作者线程。不会马上生效,要等NioProcessor的worker线程执行addNew的时候,才会真正开始管理新增的session,这个与Acceptor的bind类似
    public final void add(T session) {
        if (isDisposing()) {
            throw new IllegalStateException("Already disposed.");
        }

        newSessions.add(session);
        startupWorker();
    }



至此NioSocketAcceptor的基本实现已经描述完毕,相信读者对也有一个初步的认识。

思考题:
1. 描述一下NioSocketAcceptor处理一个新连接请求的全过程;
2. 下面代码中,mina做了什么,是怎么关闭监听端口的。
      NioSocketAcceptor acceptor = new NioSocketAcceptor();
      //省略
      accetpro.dispose();
 


补一张类图:



  • 大小: 22.7 KB
分享到:
评论
4 楼 yzhw 2012-11-01  
3 楼 hardPass 2009-07-14  
再问个问题,这段代码是什么时候执行的?我怎么没有看到哪里有调用的地方么:
   1.  protected void init() throws Exception {  
   2.     selector = Selector.open();  
   3. }  
2 楼 hardPass 2009-07-14  
感谢你的好文章!

再请教个问题,为什么这么说: NioProcessor池设为cpu个数+1,这样能够充分利用多核的cpu?

引用
NioProcessor池,默认大小是cpu个数+1,这样能够充分利用多核的cpu 


1 楼 jelver 2009-02-12  
真的非常感谢你的好文章,继续努力,关注你

相关推荐

    MINA2与Netty4比较分析

    Mina2与Netty4都使用了Reactor模式的线程模型,但具体实现细节上有所不同。 Mina2使用了一个IoAcceptor线程来监听客户端连接,对于每个监听的端口,它都会创建一个线程。一旦有新的连接,IoAcceptor会创建一个新的...

    MINA 2.0.7 官方示例

    8. **NIOEventModelExamples**:NIO事件模型示例,涵盖了MINA的两种主要事件模型——“Proactor”(预读取器)和“Reactor”(反应器)模式。 9. **AsyncWriteExample**:异步写入示例,展示MINA如何处理大文件或大...

    Mina2源码分析.docx

    IoProcessor 是真正代表会话的实际 I/O 操作的接口,它对现有的 Reactor 模式架构的 Java NIO 框架继续做了一层封装。它的泛型参数指明了它能处理的会话类型。接口中最重要的几个方法,add 用于将指定会话加入到此 ...

    ApacheMina典型例子分析参考.pdf

    Mina采用Java NIO的Reactor实现,模拟Proactor模式。在某些系统中,如Windows,Proactor可以直接利用原生的异步I/O机制,如IO Completion Ports(IOCP),但在其他不支持原生异步I/O的系统上,可以通过Reactor模拟...

    Mina2.0框架源码剖析

    `IoProcessor`是实际执行I/O操作的接口,它封装了Java NIO的Reactor模式。`add()`方法将会话添加到处理器,`flush()`强制刷新写请求队列,`remove()`则用于关闭和移出会话,`updateTrafficMask()`控制会话的读写行为...

    Mina2源码分析.doc

    `IoProcessor`是实际执行I/O操作的接口,它封装了Java NIO的Reactor模式。`add()`方法添加会话以进行处理,`flush()`强制清空写请求队列,`remove()`用于关闭和移除会话,`updateTrafficMask()`控制会话的读写权限。...

    基于Java NIO反应器模式设计与实现

    反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...

    基于Java8,SpringBoot,WebFlux,Netty,Vert.x,Reactor等开发, 一个全响应式的物联网平台

    JetLinks 基于Java8,Spring Boot 2.x ,WebFlux,Netty,Vert.x,Reactor等开发, 是一个全响应式的企业级物联网平台。支持统一物模型管理,多种设备,多种厂家,统一管理。统一设备连接管理,多协议适配(TCP,MQTT,UDP,CoAP,...

    Dubbo基本原理机制

    Dubbo 的异步通信机制是基于 Apache MINA 框架的 Reactor 模型通信框架,使用单一长连接和 NIO 异步通讯,适合小数据量大并发的服务调用。 下面是 Dubbo 的基本原理机制的详细说明: 客户端调用远程接口 1. ...

    NIO trick and trap .pdf

    实现案例**:Mina、Netty、Cindy等都是基于Reactor模式实现的高性能网络框架。 #### 六、构建高性能NIO框架的关键 **1. 减少数据拷贝** - **ByteBuffer的选择**:根据应用场景选择合适的`ByteBuffer`类型,如使用...

    java写的IM

    5. **事件驱动编程**:Java的NIO(非阻塞I/O)和Reactor模式常用于提高服务器处理效率,尤其是在高并发场景下。 6. **数据库存储**:聊天记录通常需要持久化存储,因此数据库管理(如MySQL、MongoDB等)是必需的,...

Global site tag (gtag.js) - Google Analytics