`
aileqiang
  • 浏览: 7549 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

mina2.0 源码分析--- 基于nio的服务端socket监听过程

阅读更多

   整体来说 ,mina服务端采用基于nio的单线程,轮询机制。 使用selector  获取客户端的链接,并创建sesssion,session通过process来处理io操作。

    nio的典型模式如下所示:

 

private NioEchoServer() throws IOException {
       Selector  roller = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();  //
        serverChannel.socket().bind(new InetSocketAddress(port));
        serverChannel.configureBlocking(false);
        serverChannel.register(roller, SelectionKey.OP_ACCEPT);
    }


  public void start() throws IOException {
        int keyAdded = 0;
        while ((keyAdded = roller.select()) > 0) {
            Set<SelectionKey> keySets = roller.selectedKeys();
            Iterator iter = keySets.iterator();
            while (iter.hasNext()) {
                SelectionKey key = (SelectionKey) iter.next();
                iter.remove();
                actionHandler(key);
            }
        }
    }

   1 服务端绑定port

   2 创建Selector

   3 创建SeversocketChannel,面向流的侦听socket 通道。

   4  监听客户端的请求,

 

 

 

接下来就通过mina 创建服务端socket Accpeptor 使用单线程并监处理端口和客户端请求,学习一下mina的源码

IoAcceptor  acceptor  =  new  NioSoketAcceptor ();  //根据实现的类,调用不同的构造方法
 

上面代码, 创建一个socket Acceptor  ,mina内部实现如下 :

 

super(new DefaultSocketSessionConfig(), NioProcessor.class); //传入具体的ioprocess 的实现类。

 

最终调用的类和构造方法:

AbstractPollingIoAcceptor     
   protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,    //模板模式的使用,根据子类传入IoProcessor实现类构造SimpleIoprocessorPool 池
            Class<? extends IoProcessor<T>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
                true);
    }
 


  SimpleIoProcessorPool 创建线程池,提供给Iosession ,做具体的io处理工作。

public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {

        this(processorType, null, DEFAULT_SIZE);

   }
     public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType, Executor executor, int size) { //构造IOprocessorPool 中的IOprocessor ,并指定Executor;
    try {
          .....................
          .....................   
         //创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程
          this.executor = executor = Executors.newCachedThreadPool();   //return  new ThreadPoolExecutor ;
          ...............................
          for (int i = 0; i < pool.length; i ++)
                 .........................
                 ...........................
            //指定Executor .,线程池,提供构造线程。 使用构造函数 NioProcess(Executor execytor )构造processor ,打开selector 。     
                      processor = processorType.getConstructor(ExecutorService.class).newInstance(executor);
                .............................
                ..................................
                 pool[i] = processor;
                      .....................
                    } catch (NoSuchMethodException e) {
   
 

       
   应用代码中 调用acceptor .bind(),完成对特定端口的绑定,开始监听。
    nioScoket bind ()  调用  AbstarctIoProcessor .bind() ,调用 AbstractPollingIo.bindInternal()

完成对客户端的监听。

public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
  {
      .......................
            .....................
      try {
                boundAddresses.addAll(bindInternal(localAddressesCopy));  //模板模式
            } catch (IOException e) {
                throw e;
            } catch (RuntimeException e) {
                throw e;
            } catch (Throwable e) {
                throw new RuntimeIoException(
                        "Failed to bind to: " + getLocalAddresses(), e);
            }
        }
                  ...................
        ....................
  }
 
AbstractPollingIoacceptor  :
  protected final Set<SocketAddress> bindInternal( List<? extends SocketAddress> localAddresses) throws Exception {

        AcceptorOperationFuture request = new AcceptorOperationFuture( localAddresses);

       registerQueue.add(request);


    //   acceptor 是一个接受用户请求的线程。It's a thread accepting incoming connections from clients.
    //  executor.execute(new NamePreservingRunnable(acceptor, actualThreadName));
     //启动接受用户请求。
        startupAcceptor();            

        wakeup();      //selector.wakeup(); 使尚未返回的第一个选择操作立即返回。

        request.awaitUninterruptibly();  ////堵塞直至监听成功 

        if (request.getException() != null) {

             throw request.getException();
        }
        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
        
        for (H handle:boundHandles.values()) {
            newLocalAddresses.add(localAddress(handle));   
            return newLocalAddresses;

  }
 

AbstractPollingIoAcceptor 的bindInternal   (。。。) 调用 startupAcceptor,执行 acceptor 线程 。

 

使用自己的私有类 ,private class Acceptor implements Runnable  ,来处理接收客户端请求工作

 

 private void startupAcceptor() {
        // If the acceptor is not ready, clear the queues
        // TODO : they should already be clean : do we have to do that ?
        if (!selectable) {
            registerQueue.clear();
            cancelQueue.clear();
        }

        // start the acceptor if not already started
        synchronized (lock) {
            if (acceptor == null) {
                acceptor = new Acceptor();
                executeWorker(acceptor);
            }
        }
    }
 

 

循环处理客户端请求。

acceptor  ,work线程代码

/**
     * This class is called by the startupAcceptor() method and is
     * placed into a NamePreservingRunnable class.
     * It's a thread accepting incoming connections from clients.
     * The loop is stopped when all the bound handlers are unbound.
     */
    private class Acceptor implements Runnable {
        public void run() {
            int nHandles = 0;

            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    // The select() will be woke up if some new connection
                    // have occurred, or if the selector has been explicitly
                    // woke up
                    int selected = select();

                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port on which this class will
                    // listen on
                    nHandles += registerHandles();

                    if (selected > 0) {
                        // We have some connection request, let's process 
                        // them here. 
                        processHandles(selectedHandles());
                    }

                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();

                    // Now, if the number of registred handles is 0, we can
                    // quit the loop: we don't have any socket listening
                    // for incoming connection.
                    if (nHandles == 0) {
                        synchronized (lock) {
                            if (registerQueue.isEmpty()
                                    && cancelQueue.isEmpty()) {
                                acceptor = null;
                                break;
                            }
                        }
                    }
                } catch (Throwable e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            // Cleanup all the processors, and shutdown the acceptor.
            if (selectable && isDisposing()) {
                selectable = false;
                try {
                    if (createdProcessor) {
                        processor.dispose();
                    }
                } finally {
                    try {
                        synchronized (disposalLock) {
                            if (isDisposing()) {
                                destroy();
                            }
                        }
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    } finally {
                        disposalFuture.setDone();
                    }
                }
            }
        }
 

 

 

 

 

0
0
分享到:
评论

相关推荐

    Android Java Socket框架 Mina2.0

    1. **事件驱动模型**:Mina采用了非阻塞I/O(NIO)模式,基于Java的Selector API,使得单线程可以高效处理大量并发连接。 2. **协议无关性**:Mina支持多种网络协议,如TCP/IP、UDP、SSL/TLS等,并且可以通过插件化...

    网络编程(socket、NIO、mina)---demo

    在服务端,无论是Socket、NIO还是Mina,都需要创建监听套接字来接收客户端连接,并为每个连接创建一个新的套接字或通道。而在客户端,需要建立到服务器的连接,发送和接收数据。 总结来说,这个"网络编程(socket、...

    Mina2.0完全剖析,完全自学手册

    ### Mina2.0完全剖析,完全自学手册 #### Apache Mina 概述 Apache Mina(Multipurpose Infrastructure Networked Applications)是一个强大的网络应用框架,主要用于帮助开发者构建高性能且易于扩展的网络应用程序...

    Mina2.0中文参考手册(word版)

    在使用 Mina 2.0 开发时,开发者需要具备 Java IO、NIO、Java Socket、多线程以及并发库(如 `java.util.concurrent.*`)的基础知识。Mina 为服务器端和客户端通信提供了封装,确保应用程序与底层网络通信细节分离,...

    Apache_Mina_Server_2.0中文参考手册V1.0.pdf

    ### Apache Mina Server 2.0 中文参考手册 V1.0 #### 一、Apache Mina Server 简介 Apache Mina Server 是一款强大的网络通信应用框架,旨在简化基于 TCP/IP 和 UDP/IP 协议栈的网络通信程序开发。它支持多种通信...

    java客户端socket与mina服务端通信

    Java客户端Socket与Mina服务端通信是网络编程中常见的应用场景,尤其在开发分布式系统或实时数据传输时。这里我们将深入探讨这两个技术,并了解如何通过它们建立保持长连接的通信。 首先,Socket是Java中用于实现...

    Mina中文参考手册-API

    在Mina 2.0中,如果要实现TCPServer,可以使用NioSocketAcceptor,该实现是基于java.nio.channels.ServerSocketChannel的。 2. IoProcessor:用于检查通道上是否有数据读写,并拥有自己的Selector。这个接口与使用...

    apache mina-spring 服务端程序

    在Mina中,Socket通信是通过一个事件驱动模型实现的,该模型基于I/O多路复用技术,如Java的NIO(Non-blocking I/O)。这种模型可以处理大量的并发连接,且资源消耗较低。Spring的集成使得开发者可以通过声明式的方式...

    mina框架中socket使用,有服务端和客户端。

    mina框架是Apache组织开发的一个网络通信框架,它基于Java NIO(非阻塞I/O)构建,用于简化网络编程,尤其是TCP和UDP协议的应用开发。本项目提供了服务端和客户端的示例,使得开发者能够更好地理解和应用MINA框架。 ...

    Apache-Mina-Server-2.0中文参考手册V1.0.docx

    1. **初始化IoService**:首先,你需要创建一个IoAcceptor,它是服务端和客户端的抽象,对于TCP服务器,你可以使用NioSocketAcceptor,它基于Java NIO的ServerSocketChannel。在`main`方法中,你会配置IoAcceptor,...

    Apache Mina Server 2.0 中文参考手册

    Apache Mina Server 2.0 是一个强大的网络通信框架,主要基于Java NIO技术,用于构建高性能、可扩展的TCP和UDP应用程序,同时也支持串口通信。Mina的核心特性包括非阻塞的异步传输模式,事件驱动机制,批量数据处理...

    Mina+Socket通信

    在Mina与Socket通信的实现中,服务端通常使用Mina来创建一个Acceptor,监听特定端口,等待客户端的连接请求。一旦有连接建立,Mina会自动触发相应的事件处理器,开发者可以在其中处理数据读写。以下是一个基本的...

    Apache Mina Server 2.0 中文参考手册(带目录)

    1. 编写IoService:使用NioSocketAcceptor类创建一个服务端实例,它基于java.nio.channels.ServerSocketChannel,也可以使用Apache APR库中的AprSocketAcceptor来提高性能。 2. 配置会话:设置会话配置,如读缓冲区...

    mina服务端例子

    在这个“Mina服务端例子”中,我们主要探讨的是如何使用Mina框架来实现一个基于Socket的非阻塞I/O(NIO)服务端。 1. **Mina框架介绍**: Mina提供了事件驱动和异步处理模型,使得开发者可以专注于业务逻辑,而...

    mina 服务器socket客服端发消息

    以上就是使用Java Mina框架创建一个简单的Socket服务端和客户端的实现。在实际应用中,可能还需要考虑异常处理、心跳机制、多线程处理、消息序列化与反序列化等问题。Mina提供的API非常灵活,可以根据需求进行扩展和...

    mina传文件案例,客户端加服务端

    《minafileserver:基于Socket与Mina的文件传输实践》 在IT行业中,网络通信是不可或缺的一部分,尤其是在分布式系统和互联网应用中。Apache Mina是一个轻量级、高性能的网络应用框架,它简化了网络编程,尤其是TCP...

    mina及时推送客户端服务端实现

    4. **服务端实现**:服务端同样基于MINA,通过ServerBootstrap配置Acceptor,监听端口,接收到客户端连接请求后,创建Session。服务端可以注册监听器来处理客户端的连接、断开、数据到达等事件,实现推送服务的逻辑...

    mina-2.0.19官方相关jar包

    1. **异步I/O模型**:MINA基于NIO(Non-blocking I/O)实现,提供非阻塞的读写操作,使得处理大量并发连接变得高效。这种模型尤其适合高并发、低延迟的网络服务,如TCP和UDP服务器。 2. **协议无关性**:MINA抽象了...

Global site tag (gtag.js) - Google Analytics