`
uniseraph
  • 浏览: 83407 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

【mina指南】mina中的reactor模式(二)

阅读更多
NioProcessor是mina中的另一个核心部分,与NioSocketAcceptor类似,NioProcessor三个主要功能是:
1、接受一个NioSession
2、出来NioSession上的read、write等事件
3、关闭一个NioSession

与NioSocketAcceptor类似,NioProcessor的实现采用了template模式,以上功能整体流程在NioProcessor的父类AbstractPollingIoProcessor中基本完成了,NioSocketAcceptor只是针对Nio的情况完成实现。

创建NioProcessor

如上图,NioSocketAcceptor创建了SimpleIoProcessorPool,SimpleIoProcessorPool中默认存在cpu数+1个NioProcessor,并且这些NioProcessor的工作者线程共享一个线程池。

接受一个NioSession
与NioSocketAcceptor新增一个端口绑定类似,NioProcessor.addSession只是将NioSocketAcceptor新建的NioSession放入一个消息队列中,由工作者线程负责初始化该NioSession,在selector为该session注册OP_READ事件。

关闭一个NioSession
与接受一个NioSession类似,不再描述

NioSession的数据处理
为NioProcessor继承自AbstractPollingIoProcessor的工作者线程中完成主要功能
1. 在循环中,selector监听所有端口,注意在NioProcessor中的select超时时间为1秒,这意味着最多一秒钟的时候,NioProcessor.Worker线程唤醒一次。而在NioSocketAcceptor.Work.run中select是没有超时时间的。下面Worker线程两次唤醒之间简称为一个周期,易知一个周期的长度小于等于一秒。
        public void run() {
            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    boolean selected = select(1000);

                    nSessions += add();
                    updateTrafficMask();

                    if (selected) {
                        process();
                    }

                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);
                    nSessions -= remove();
                    notifyIdleSessions(currentTime);

                    if (nSessions == 0) {
                        synchronized (lock) {
                            if (newSessions.isEmpty() && isSelectorEmpty()) {
                                worker = null;
                                break;
                            }
                        }
                    }

                    // Disconnect all sessions immediately if disposal has been
                    // requested so that we exit this loop eventually.
                    if (isDisposing()) {
                        for (Iterator<T> i = allSessions(); i.hasNext(); ) {
                            scheduleRemove(i.next());
                        }
                        wakeup();
                    }
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            try {
                synchronized (disposalLock) {
                    if (isDisposing()) {
                        dispose0();
                    }
                }
            } catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);
            } finally {
                disposalFuture.setValue(true);
            }
        }

2. 在addSession中从新增session队列newSeesion获取一个新增NioSession,并开始监控之。
    private int add() {
        int addedSessions = 0;
        
        // Loop on the new sessions blocking queue, to count
        // the number of sessions who has been created
        for (;;) {
            T session = newSessions.poll();

            if (session == null) {
                // We don't have anymore new sessions
                break;
            }


            if (addNow(session)) {
                // The new session has been added to the 
                addedSessions ++;
            }
        }

        return addedSessions;
    }

2.1 在addNow(T session)中完成了单个NioSession的初始化
  private boolean addNow(T session) {

        boolean registered = false;
        boolean notified = false;
        try {
            init(session);
            registered = true;

            // Build the filter chain of this session.
            session.getService().getFilterChainBuilder().buildFilterChain(
                    session.getFilterChain());

            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            // in AbstractIoFilterChain.fireSessionOpened().
            ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
            notified = true;
        } catch (Throwable e) {
            if (notified) {
                // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
                // and call ConnectFuture.setException().
                scheduleRemove(session);
                session.getFilterChain().fireExceptionCaught(e);
                wakeup();
            } else {
                ExceptionMonitor.getInstance().exceptionCaught(e);
                try {
                    destroy(session);
                } catch (Exception e1) {
                    ExceptionMonitor.getInstance().exceptionCaught(e1);
                } finally {
                    registered = false;
                }
            }
        }
        return registered;
    }


2.1.1 初始化session,这里是Template Method的又一个体现,因为不同的类型的session初始化实现不同, 在NioProcessor中包括:设置非堵塞模式.为该session注册OP_READ事件。
 @Override
    protected void init(NioSession session) throws Exception {
        SelectableChannel ch = (SelectableChannel) session.getChannel();
        ch.configureBlocking(false);
        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
    }


2.1.2 构建IoFilterChain,具体请参考http://uniseraph.iteye.com/blog/228194

2.1.3 触发NioSession上的相关事件,依次为sessionCreated->sessionOpened ->
IoServiceListener的sessionCreated

3 修改sessio的traffic参数
从trafficControllingSessions获取需要修改session的traffic参数,具体与新增NioSession类似,不再详细描述。

4 接受处理socket数据并应答
关键内容来了,如果有NioSession的channel处理于OP_READ状态,则处理之
     if (selected) {
                        process();
                    }


process方法对于所有发生了OP_READ或OP_WRITE的NioSession依次进行处理,注意虽然在NioSession初始化的时候只注册了OP_READ事件,但是在上一周期调用session.write方法的时候,上一周期的flush方法将会注册OP_WRITE方法。本周期发送的数据都是上一周期确定的。

    private void process(T session) {

        if (isReadable(session) && session.getTrafficMask().isReadable()) {
            read(session);
        }

        if (isWritable(session) && session.getTrafficMask().isWritable()) {
            scheduleFlush(session);
        }
    }


在read方法中读取socket上的数据,调用发IoFilter,逐层传递到IoHander(具体参考http://uniseraph.iteye.com/blog/228194);如果读到-1,则增加一个关闭连接消息到队列中;如果发生异常,则异常调用IoFilter和IoHandler的fireExceptionCaught方法
    private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                session.getFilterChain().fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            session.getFilterChain().fireExceptionCaught(e);
        }
    }


发送数据


6 关闭连接及其他
如果没有消息积累,也没有新创建的连接,则关闭线程池

  • 大小: 9 KB
5
0
分享到:
评论
2 楼 wangwenjunHi 2010-01-13  
Aaronlee 写道
想问下,对于客户端传入的不同的MESSAGE ,在服务器端怎么解码?

至于怎么解码,就要看你如何进行解码器的实现了,通常来说服务端接收到的来自不同客户端的之间的通信协议都会是一致的,所以就按照协议的格式进行解码了
1 楼 Aaronlee 2009-06-15  
想问下,对于客户端传入的不同的MESSAGE ,在服务器端怎么解码?

相关推荐

    MINA2与Netty4比较分析

    Mina2与Netty4都使用了Reactor模式的线程模型,但具体实现细节上有所不同。 Mina2使用了一个IoAcceptor线程来监听客户端连接,对于每个监听的端口,它都会创建一个线程。一旦有新的连接,IoAcceptor会创建一个新的...

    MINA 2.0.7 官方示例

    8. **NIOEventModelExamples**:NIO事件模型示例,涵盖了MINA的两种主要事件模型——“Proactor”(预读取器)和“Reactor”(反应器)模式。 9. **AsyncWriteExample**:异步写入示例,展示MINA如何处理大文件或大...

    ApacheMina典型例子分析参考.pdf

    Mina采用Java NIO的Reactor实现,模拟Proactor模式。在某些系统中,如Windows,Proactor可以直接利用原生的异步I/O机制,如IO Completion Ports(IOCP),但在其他不支持原生异步I/O的系统上,可以通过Reactor模拟...

    Mina2源码分析.docx

    IoProcessor 是真正代表会话的实际 I/O 操作的接口,它对现有的 Reactor 模式架构的 Java NIO 框架继续做了一层封装。它的泛型参数指明了它能处理的会话类型。接口中最重要的几个方法,add 用于将指定会话加入到此 ...

    Mina2.0框架源码剖析

    `IoProcessor`是实际执行I/O操作的接口,它封装了Java NIO的Reactor模式。`add()`方法将会话添加到处理器,`flush()`强制刷新写请求队列,`remove()`则用于关闭和移出会话,`updateTrafficMask()`控制会话的读写行为...

    Mina2源码分析.doc

    `IoProcessor`是实际执行I/O操作的接口,它封装了Java NIO的Reactor模式。`add()`方法添加会话以进行处理,`flush()`强制清空写请求队列,`remove()`用于关闭和移除会话,`updateTrafficMask()`控制会话的读写权限。...

    基于Java NIO反应器模式设计与实现

    反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...

    基于Java8,SpringBoot,WebFlux,Netty,Vert.x,Reactor等开发, 一个全响应式的物联网平台

    JetLinks 基于Java8,Spring Boot 2.x ,WebFlux,Netty,Vert.x,Reactor等开发, 是一个全响应式的企业级物联网平台。支持统一物模型管理,多种设备,多种厂家,统一管理。统一设备连接管理,多协议适配(TCP,MQTT,UDP,CoAP,...

    NIO trick and trap .pdf

    实现案例**:Mina、Netty、Cindy等都是基于Reactor模式实现的高性能网络框架。 #### 六、构建高性能NIO框架的关键 **1. 减少数据拷贝** - **ByteBuffer的选择**:根据应用场景选择合适的`ByteBuffer`类型,如使用...

    java写的IM

    5. **事件驱动编程**:Java的NIO(非阻塞I/O)和Reactor模式常用于提高服务器处理效率,尤其是在高并发场景下。 6. **数据库存储**:聊天记录通常需要持久化存储,因此数据库管理(如MySQL、MongoDB等)是必需的,...

    Dubbo基本原理机制

    Dubbo 的异步通信机制是基于 Apache MINA 框架的 Reactor 模型通信框架,使用单一长连接和 NIO 异步通讯,适合小数据量大并发的服务调用。 下面是 Dubbo 的基本原理机制的详细说明: 客户端调用远程接口 1. ...

Global site tag (gtag.js) - Google Analytics