关于Mina
mina是开源的NIO框架,其project地址:
http://mina.apache.org/mina-project/features.html
想快速了解mina就看user guide:
http://mina.apache.org/mina-project/userguide/user-guide-toc.html
mina给我的感觉:干净、利落的抽象,非常容易上手,使用mina你只需要写不需要超过10行code就可以搭建一个TCP服务器,就像mina自身带的例子:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.example.gettingstarted.timeserver.TimeServerHandler; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MinaTimeServer { /** * @param args */ private static final int PORT = 9123; public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "logger", new LoggingFilter()); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName( "UTF-8" )))); acceptor.setHandler(new TimeServerHandler()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10); acceptor.bind(new InetSocketAddress(PORT) ); } }
Mina源码分析之bind
mina好用只是它把复杂很好的进行了分解和隐藏,要想真正掌握它,还是要深入了解源码,所以从bind方法开始,bind方法是interface IoAcceptor声明的方法,先看看它的javadoc,了解bind方法的职责:
/** * Binds to the default local address(es) and start to accept incoming * connections. * * @throws IOException if failed to bind */ void bind() throws IOException; /** * Binds to the specified local address and start to accept incoming * connections. * * @param localAddress The SocketAddress to bind to * * @throws IOException if failed to bind */ void bind(SocketAddress localAddress) throws IOException; /** * Binds to the specified local addresses and start to accept incoming * connections. If no address is given, bind on the default local address. * * @param firstLocalAddresses The first address to bind to * @param addresses The SocketAddresses to bind to * * @throws IOException if failed to bind */ void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException; /** * Binds to the specified local addresses and start to accept incoming * connections. If no address is given, bind on the default local address. * * @param addresses The SocketAddresses to bind to * * @throws IOException if failed to bind */ void bind(SocketAddress... addresses) throws IOException; /** * Binds to the specified local addresses and start to accept incoming * connections. * * @throws IOException if failed to bind */ void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
bind方法职责
1.绑定到本地ip地址(如果没有指定就绑定到默认ip)
2.开始接受外面进来的请求。
通俗地说,一个Tcp的或则UDP的IO Server调用了bind方法后,就可以开始干活了,就可以接受请求了。顺便说一句,mina对IO的两端(Server,Client)都有很好的抽象和封装,IO Server由IoAcceptor代表,IO Client由IoConnector代表,而它们又都是IoService的直接子接口。关于mina的整体架构还是去看user guide,非常清晰。
明白了bind方法的职责,不妨先不要往下看,而是结合java NIO api 思考一下其大概的实现细节,然后再去翻code来验证你的推测。
对于一个基于NIO的server来说,要想能够开始接受外面的请求,有几件事必须做:
1.open一个SelectableChannel,SelectableChannel代表了可以支持非阻塞IO操作的channel
2.open一个Selector,Selector通过select方法来监控所有可以进行IO操作的SelectableChannel
3.完成SelectableChannel在Selector上的注册,这样Selector方可监控他。而类SelectionKey则代表一个成功的注册。
当然对于一个框架来说,肯定会做的更多,但以上3点是必须的,那接下来就去翻code吧
首先看bind方法所在的类层次:
从类图中可以看出,真正处理bind逻辑的是bindInternal方法,该方法在AbstractIoAcceptor抽象类中声明,在AbstractPollingIoAcceptor类中实现。正如javadoc的说明,AbstractPollingIoAcceptor是A base class for implementing transport using a polling strategy.
bindInternal方法
下面是bindInternal方法的code:
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); // adds the Registration request to the queue for the Workers // to handle registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. try { lock.acquire(); // Wait a bit to give a chance to the Acceptor thread to do the select() Thread.sleep(10); wakeup(); } finally { lock.release(); } // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>(); for (H handle : boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; }
1.创建一个bind请求:
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
当前你可以把AcceptorOperationFuture简单理解为支持异步处理和事件通知机制的一个request,后续再详细分析它。
2.把请求加到队列中:
registerQueue.add(request);
registerQueue保存所有新注册的请求,这些请求在registerHandles方法中被消费掉。registerHandles方法又被Acceptor.run方法调用。新注册成功的连接都被保存在:
private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
3.创建Acceptor类,并且异步执行该类的run方法。
startupAcceptor();
Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求。下面会重点分析这个类。
4.等待10毫秒,然后调用wakeup方法来确保selector的select方法被唤醒。
try {
lock.acquire();
// Wait a bit to give a chance to the Acceptor thread to do the select()
Thread.sleep(10);
wakeup();
} finally {
lock.release();
}
lock是一个信号量:
private final Semaphore lock = new Semaphore(1);由于可用值是1,因此相当于排他锁。
这里要重点解释一下为什么要sleep这10毫秒:要搞清楚这点,首先要看Acceptor.run方法的实现(下面介绍),该方法调用了
int selected = select();
而select方法是abstract的:
/** * Check for acceptable connections, interrupt when at least a server is ready for accepting. * All the ready server socket descriptors need to be returned by {@link #selectedHandles()} * @return The number of sockets having got incoming client * @throws Exception any exception thrown by the underlying systems calls */ protected abstract int select() throws Exception;
大家能够猜到其子类:NioSocketAcceptor的实现就是调用Selector.select方法:
@Override protected int select() throws Exception { return selector.select(); }
而select方法在没有io事件也没有调用selector.wakeup方法时是阻塞的,反之就会被唤醒。所以才有了上面sleep10毫秒后调用wakeup方法(也是abstract方法,子类NioSocketAcceptor实现就是调用Selector.wakeUp),来唤醒select。但由于Accecptor是在独立线程运行的,因而sleep一会以保证那个线程已经启动,而不会错过唤醒select.
5.然后就等待请求被处理完毕,并判断是否有异常,没有异常就把绑定的本地地址返回。
Acceptor
Acceptor是一个内部类,实现了Runnable方法。它负责接收和处理从客户端进来的bind和unbind请求
接下来重点看Acceptor的run方法:
public void run() { assert (acceptorRef.get() == this); int nHandles = 0; // Release the lock lock.release(); while (selectable) { try { // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up int selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on nHandles += registerHandles(); // 这里推出while循环的逻辑 省略。。。 if (selected > 0) { // We have some connection request, let's process // them here. processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop break; } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. // ..... } }
run方法重点是while循环:
1.首先调用select方法来看是否有新io请求进来(由于Acceptor只被bind和unbind调用,因此此时的请求只可能是bind请求和unbind请求)
2.调用registerHandles处理bind请求,该方法返回新处理的bind请求数目,如果=0,则推出循环,Acceptor的处理结束。处理bind请求的大概过程是:
1)从registerQueue队列中拿出bind请求:AcceptorOperationFuture(之前放进去的)
2)从AcceptorOperationFuture中拿出要bind的SocketAddress(可能不止一个);
3)对于每一个SocketAddress,调用open方法来建立ServerSocketChannel通道。open方法是abstract,其子类NioSocketAcceptor给出了实现,没错正如我们之前设想的那样,NioSocketAcceptor.open主要就是完成ServerSocketChannel的注册,源码如下:
@Override protected ServerSocketChannel open(SocketAddress localAddress) throws Exception { // Creates the listening ServerSocket ServerSocketChannel channel = ServerSocketChannel.open(); boolean success = false; try { // This is a non blocking socket channel channel.configureBlocking(false); // Configure the server socket, ServerSocket socket = channel.socket(); // Set the reuseAddress flag accordingly with the setting socket.setReuseAddress(isReuseAddress()); // and bind. socket.bind(localAddress, getBacklog()); // Register the channel within the selector for ACCEPT event channel.register(selector, SelectionKey.OP_ACCEPT); success = true; } finally { if (!success) { close(channel); } } return channel; }
4)把第3)建立好的ServerSocketChannel放到boundHandles中。boundHandles是一个线程安全的map:
private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
5)返回新注册的size。
3. 如果select方法返回值>0,就调用processHandles(selectedHandles());来处理。处理过程大致如下:
1)从selector中拿出有io发生的ServerSocketChannel集合;这个过程是在子类NioSocketAcceptor .selectedHandles方法中发生的。
2)对于每一个ServerSocketChannel,调用accept方法来建立会话:IoSession,IoSession是mina中非常重要的概念,以后再分析。accept方法也是abstract的,其子类NioSocketAccepor给出了实现:
@Override protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); }
3)初始化session:initSession(session, null, null);
4)保存session到IoProcessor
4.处理unbind的逻辑:
nHandles -= unregisterHandles();
前面说过Acceptor的用户是bind和unbind,所以run方法里自然会有处理unbind的逻辑,不了解这一点看到这段code可能会有些困惑。
unbind的逻辑就很简单了,大致过程如下:
1)从cancelQueue中取出unbind的请求:AcceptorOperationFuture
2)从AcceptorOperationFuture中取出SocketAddress对象(可能不止一个)
3)对于每一个SocketAddress,把它从boundHandles中删除
4)对于每一个SocketAddress,调用close方法来关闭它,close方法也是abstract方法,子类NioSocketAcceptor给出了实现:
@Override protected void close(ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if (key != null) { key.cancel(); } handle.close(); }
5)调用AcceptorOperationFuture.setDone来做事件通知
AbstractPollingIoAcceptor.bindInternal
->startupAcceptor//启动Acceptor线程
-->Acceptor.run
--->registerHandles
主要逻辑:完成ServerSocketChannel在selector中的注册,感兴趣的事件是: SelectionKey.OP_ACCEPT,注册后ServerSocketChannel就可以在指定的ip和port上监听新连接。
1)从registerQueue中取出头元素,registerQueue保存待注册的请求
AcceptorOperationFuture future = registerQueue.poll();
1)创建ServerSocketChannel:ServerSocketChannel.open();
2)把ServerSocket绑定到指定的SocketAddress,SocketAddress来自
List<SocketAddress> localAddresses = future.getLocalAddresses();
3)把ServerSocketChannel注册到selector:channel.register(selector, SelectionKey.OP_ACCEPT);
4)返回channel并保存在boundHandles中
--->processHandles
主要逻辑:处理ServerSocketChannel上的新连接请求,把新连接请求构建新的IoSession并初始化,然后把IoSession实例交给IoProcessor来管理和处理io操作。
1)调用NioSocketAcceptor.accept方法返回NioSocketSession实例。构建NioSocketSession需要SocketChannel、IoProcessor等参数。
2)调用initSession方法
3)把当前session加到IoProcessor中:void add(S session)
--->unregisterHandles
主要逻辑:响应doUnbind方法,关闭ServerSocketChannel
相关推荐
### Mina2源码分析——核心模块解析 #### 概述 Mina2是一个高性能、可扩展的网络应用框架,支持多种传输协议如TCP、UDP等,并提供了丰富的API供开发者使用。本文旨在深入剖析Mina2的核心部分,帮助读者更好地理解和...
在这个“MINA源码分析,内涵类的讲解”中,我们将深入探讨MINA的核心组件和设计模式,以便更好地理解和利用这个强大的框架。 首先,我们需要了解MINA的基础架构。MINA的核心是`IoService`接口,它定义了服务端和...
在深入解析Mina源码的过程中,我们可以从以下几个关键知识点入手: 1. **Mina的过滤器机制实现**: Mina的核心设计理念之一是过滤器链(Filter Chain),它借鉴了Servlet的过滤器模型。每个过滤器都可以在数据传输...
通过阅读和分析`apache-mina-2.0.16`的源码,我们可以深入理解MINA的设计思想,学习如何构建高效的网络服务,并能根据自己的需求定制和扩展MINA的功能。对于想要从事网络编程或系统架构设计的开发者来说,研究MINA...
在MINA源码中,我们可以看到以下关键组件: 1. **Session**:这是MINA中的核心概念,代表了网络连接。Session负责管理连接状态,并提供读写数据的方法。 2. **Filter Chain**:MINA采用过滤器链模式来处理网络事件...
在分析"apache-mina-2.0.4"源码时,我们可以深入理解其设计理念、架构以及核心组件。 1. **MINA的设计理念** MINA的核心理念是异步非阻塞I/O,它利用Java NIO(New I/O)库来实现高效的网络通信。通过非阻塞I/O,...
### mina2.0源码svn地址解析与详细介绍 #### 一、Mina2.0简介 Mina(Multi-threaded IO Network Architecture)是Apache软件基金会的一个开源项目,旨在为开发人员提供一个易于使用的高性能网络应用程序框架。Mina...
Mina2源码分析 Mina2框架源码剖析是 Java 编程语言中一个流行的网络编程框架,用于构建高性能、可扩展的网络应用程序。该框架核心包包括 org.apache.mina.core.service, org.apache.mina.core.session, org.apache....
`bind()`和`unbind()`方法分别用于启动和停止连接接受。 `IoConnector`与`IoAcceptor`对应,是客户端连接发起者。它尝试连接到服务器,并在连接成功、失败或取消时触发事件。`connect()`方法启动连接尝试,返回`...
2. **分析源码**:通过阅读MINA的源码,了解其内部处理流程,如读写事件的处理、过滤器链的工作方式等。 3. **编写简单的应用**:从例子入手,创建一个简单的MINA服务器和客户端,理解它们之间的通信机制。 4. **...
### Mina2源码分析:深入理解IoService与IoProcessor 在探讨Mina框架的核心机制时,我们聚焦于`IoService`与`IoProcessor`,它们是Mina框架设计的灵魂,为实现高效、灵活的网络通信提供了坚实的基石。 #### ...
这个压缩包"mina学习资料与源码分析"包含了帮助你快速入门Mina、深入理解其工作原理和API的资源。下面,我们将深入探讨Mina的核心概念、功能以及如何利用它来开发网络应用。 Mina源于Java社区,其全称为“Minimal ...
6. **Nio**与**Aio**:Mina2支持两种不同的I/O模型——NIO(非阻塞I/O)和AIO(异步I/O)。NIO适用于高并发场景,而AIO在低延迟和高吞吐量的应用中表现出色。 通过深入研究Mina2的源码,我们可以了解到如何优化网络...
通过深入分析Mina2.0的源码,我们可以了解到其内部的事件驱动模型、网络通信机制以及如何通过过滤器和处理器来处理网络请求。这有助于我们更好地理解和利用Mina框架来开发高并发、高性能的网络应用。
通过分析源码,我们可以看到Mina如何优雅地将网络编程的复杂性封装起来,让开发者可以更专注于应用的业务层面。同时,通过心跳机制,我们能够实时检测网络状况,确保服务的稳定性和可靠性。 总的来说,Mina框架提供...
此外,源码分析还能帮助你掌握如何自定义Filter和Codec,以满足特定的应用场景需求。 总之,MINA 2.0.9源码为开发者提供了一个深入学习网络编程和异步I/O的好资源,通过研究和实践,开发者可以提升自己的网络应用...
### MINA源码走读与实例 #### 一、MINA概述 **MINA**(**M**ulti **I**nterface **N**etwork **A**pplication)是Apache组织下的一款开源网络通信框架,它主要针对TCP/IP、UDP/IP协议栈提供了高效的封装和扩展能力...
mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...
通过对Mina源码的学习,开发者不仅能理解其设计理念,还能掌握如何优化网络通信应用,提高系统的稳定性和性能。此外,学习Mina也能帮助开发者更好地理解Java NIO和网络编程的底层原理,对于提升专业技能非常有帮助。
为了帮助读者快速理解如何使用 Mina 2.0 开发简单的网络服务,下面提供了一个基于时间服务器的例子——`MinaTimeServer.java`。 ```java package com.vista; import java.io.IOException; import java.net....