最近阅读了mina的源代码,这里给大家做个分享:
一、mina server请求处理模型
下面的几张图对大家后面的理解会有一定的帮助,先贴出来:
1.acceptor模型:
2.filterchain和processor处理模型
二、mina server核心组件剖析
1.acceptor:
首先附上一张acceptor的uml图,方面大家了阅读源码时了解到我们讲到的一些细节:
mina框架中负责监听连接的类,使用单个线程运行。使用nio的selector实现。将serversocketchannel的accept注册到selector中,使用单个线程轮训选择器的方式不断检测是否有新的连接进入。在Acceptor类中你会看到如下代码:
while (selectable) { try { int selected = select(); nHandles += registerHandles(); if (nHandles == 0) { acceptorRef.set(null); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { assert (acceptorRef.get() != this); break; } if (!acceptorRef.compareAndSet(null, this)) { assert (acceptorRef.get() != this); break; } assert (acceptorRef.get() == this); } if (selected > 0) { processHandles(selectedHandles()); } }
我们从registerHandles方法开始读起,这方法中会在acceptor自己的选择器中注册serversocketchannel,并将accept标记为感兴趣的事件。选择器是java nio中异步的基础,这里我做下简单的介绍。首先有几个概念需要提一下,1.
SelectableChannel 2.selector 3.selectkey。java nio中所有继承至SelectableChannel的类都是可以做异步io的,那什么是异步io呢?这是相对于同步io来说的,如果你使用的是同步io,当你需要从本机socket中读数据,但是socket的buffer中又没有数据的情况下,当前线程是要被block住的。而使用异步io,即使socket的buffer中没有数据,当前线程可以不被阻塞掉,而可以去做其他事情。selector是java能做异步io的重要组件,其最重要的功能就是就绪选择,这是java socket异步通信的基础。每个SelectableChannel都可以将自己感兴趣的操作注册到selector中,当自己感兴趣的操作被选择器确定为已就绪时,SelectableChannel就可以进行相应的处理。那你肯定要问SelectableChannel和selector是通过什么联系起来的?答案就是selectkey,每一个注册进selector的SelectableChannel都会生成一个selectkey,这个selectkey标示了当前SelectableChannel的状态。selector中维护了三个selectkey集合:1.已选择的selectkey集合。2.已注册的selectkey集合。3.已取消的selectkey集合。每次调用selector的select方法时,都会更新一次选择键的集合,这里我们比较关心的是已选择的selectkey集合。mian中的acceptor和nioprocessor都是使用selector实现的,这就让单个线程可以处理很多请求,你可能不会想像到,mina维护socket连接的线程只有一个(acceptor),你在稍后的代码中就可以看到。先看下serversocketchannle是如何在selector中注册自身的:
private int registerHandles() { for (;;) { // The register queue contains the list of services to manage // in this acceptor. AcceptorOperationFuture future = registerQueue.poll(); if (future == null) { return 0; } // We create a temporary map to store the bound handles, // as we may have to remove them all if there is an exception // during the sockets opening. Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>(); List<SocketAddress> localAddresses = future.getLocalAddresses(); try { // Process all the addresses for (SocketAddress a : localAddresses) { H handle = open(a); newHandles.put(localAddress(handle), handle); } ..............
再次进入到open方法:
// Creates the listening ServerSocket ServerSocketChannel channel = ServerSocketChannel.open(); boolean success = false; try { // This is a non blocking socket channel channel.configureBlocking(false); // Configure the server socket, ServerSocket socket = channel.socket(); // Set the reuseAddress flag accordingly with the setting socket.setReuseAddress(isReuseAddress()); // and bind. socket.bind(localAddress, getBacklog()); // Register the channel within the selector for ACCEPT event channel.register(selector, SelectionKey.OP_ACCEPT); success = true; } finally { if (!success) { close(channel); } } return channel;
如上所示,serversocketchannle将自身注册到了acceptor的selector当中,并将accept事件标记为感兴趣的事件,每当有新的socket连接建立时,accept事件都会被触发,当调用selector的select方法时,selector中的已选择的选择键集合就会被更新,然后通过选择键可以获取到serversocketchannel,调用serversocketchannel的accept方法就可以拿到新的socket连接的socketchannel。这个在稍后负责建立连接的acceptor线程中可以看到。我们继续阅读第一段代码,你会看到:
if (selected > 0) { processHandles(selectedHandles()); }
selected是selector.select()的结果,当有通道被选择器选择时,selected的值就会>0。selectedHadnles方法会返回一个可遍历的已选择的选择键集合,我们直接进入processHandles方法:
private void processHandles(Iterator<H> handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } } }
以上代码对以选择的选择键进行了遍历,accept选择键遍历出,然后从中获取新的连接对应的serversocketchannel并将其放入到NioSession中,mina会将每一个连接对应的socketchannel都放入到一个新的niosession中,你可以将niosession理解为对一个连接会话的抽象。稍后我会着重将下niosession的实现,里面有一个filterchain是mina可扩展业务模型的核心。现在我们进入accept方法中看下:
@Override protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); }
正如我刚才说的这里会拿出新连接的socketchannel并将他放入到niosesison中。NioSocketSession对象的构造函数中的processor是一个processor对象池,这是在acceptor初始化的时候建立的,接下来我们就进入niosession的源码阅读。
2.niosession
和上面一样,首先附上一张niosession的uml图:
niosocketSession中一个比较核心的东西就是filterchain,看到filter就觉得很熟悉吧?会把他和用户自定义的filter进行联系?没错这个filterchain就包含了用户自定义的filter以及用户自定义的handler。看到filter我的第一感觉就是mina应该会用职责链的设计模式来将filterchain做成可扩展的设计。我们进入NioSession看看filterchain是如何被初始化的:
protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) { super(service); this.channel = channel; this.processor = processor; filterChain = new DefaultIoFilterChain(this); }
接着进入DefaultIoFilterChain的构造函数:
public DefaultIoFilterChain(AbstractIoSession session) { if (session == null) { throw new IllegalArgumentException("session"); } this.session = session; head = new EntryImpl(null, null, "head", new HeadFilter()); tail = new EntryImpl(head, null, "tail", new TailFilter()); head.nextEntry = tail; }
这是比较关键的地方,filterchain在初始化的时候会构建两个默认的filter:headfilter和tailfiter,看他们的名字应该能想到他们是处理链的头和尾。这两个filter分别被封装进两个entry中,并组成了entry链,进入EntryImpl的构造方法:
private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) { if (filter == null) { throw new IllegalArgumentException("filter"); } if (name == null) { throw new IllegalArgumentException("name"); } this.prevEntry = prevEntry; this.nextEntry = nextEntry; this.name = name; this.filter = filter; this.nextFilter = new NextFilter() { public void sessionCreated(IoSession session) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionCreated(nextEntry, session); } public void sessionOpened(IoSession session) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionOpened(nextEntry, session); } public void sessionClosed(IoSession session) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionClosed(nextEntry, session); } public void sessionIdle(IoSession session, IdleStatus status) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionIdle(nextEntry, session, status); } public void exceptionCaught(IoSession session, Throwable cause) { Entry nextEntry = EntryImpl.this.nextEntry; callNextExceptionCaught(nextEntry, session, cause); } public void messageReceived(IoSession session, Object message) { Entry nextEntry = EntryImpl.this.nextEntry; callNextMessageReceived(nextEntry, session, message); } public void messageSent(IoSession session, WriteRequest writeRequest) { Entry nextEntry = EntryImpl.this.nextEntry; callNextMessageSent(nextEntry, session, writeRequest); } public void filterWrite(IoSession session, WriteRequest writeRequest) { Entry nextEntry = EntryImpl.this.prevEntry; callPreviousFilterWrite(nextEntry, session, writeRequest); } public void filterClose(IoSession session) { Entry nextEntry = EntryImpl.this.prevEntry; callPreviousFilterClose(nextEntry, session); } public String toString() { return EntryImpl.this.nextEntry.name; } }; }
从上面的代码可以看到,每个entryimpl都包含preentry、nextentry、filter、nextfilter这几个关键的属性。你现在能想像到mina的职责链是如何设计的吗?我们随机挑选一个call**方法就很清楚了:
private void callNextSessionCreated(Entry entry, IoSession session) { try { IoFilter filter = entry.getFilter(); NextFilter nextFilter = entry.getNextFilter(); filter.sessionCreated(nextFilter, session); } catch (Throwable e) { fireExceptionCaught(e); } }
哈哈,真相就在这里,注意方法中entry入參,这里的entry对象传入的是当前entry的nextentry。这里执行了filterchain中当前链节点下一个链节点的xxx方法,而nexfilterchain中的方法又对下下个链节点中的filter进行了xxx方法的调用。可以想到,如果要在处理链中加入自己的filter,你只需要在filter中实现自己的业务逻辑,在最后调用下xxx方法,并把nextfilter做成如參传入。关于用户自定义的链是如何被加入到filterchain中的以及handler是如何被扩展、如何被处理的,我在稍后会提到。大概过完niosession的filterchain,现在我们继续进入下一个环节,nioprocessor.
3.processor
我们回到刚才的主线代码继续阅读:
进入acceptor的processHandles方法:
private void processHandles(Iterator<H> handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } }可以看到,创建完session对象后,就将session对象放入到nioprocessor中。我们进入SimpleIoProcessorPool的add方法中,这个方法会从processorpool中选择出一个processor,并将session放入到processor的队列中,代码如下:
private IoProcessor<S> getProcessor(S session) { IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; }通过一个取模算法,随机从processorpool中取出一个processor,我们继续进入nioprocessor继承的抽象类AbstractPollingIoProcessor的add方法中:
public final void add(S session) { if (disposed || disposing) { throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker newSessions.add(session); startupProcessor(); }可以看到这个方法将session加入到了自己的队列中。读到这里可以猜想到,后面肯定会有地方不断的pool newSessions队列中的数据去处理,我们接着往下走,进入startupProcessor方法:
private void startupProcessor() { Processor processor = processorRef.get(); if (processor == null) { processor = new Processor(); if (processorRef.compareAndSet(null, processor)) { executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. wakeup(); }正如上所示,如果Processor对象不为空,则立即唤醒选择器,你一定会问选择器在什么时候会被阻塞?这个问题问得好,当注册到选择器上所有通道的服务都不可用时,这个时候选择器会被阻塞掉。如果你调用的选择器时使用的是不带任何入參的select方法时,这个时候选择器会永远阻塞下去,mina processor的选择器是使用的带参数的select方法。所以要使用wakeup方法去唤醒阻塞的选择器,让它重新选择,当选择器没有阻塞时调用wakeup不会产生什么影响。我们接着进入Processor线程,看它的内部实现:
// Manage newly created session first nSessions += handleNewSessions(); updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test... process(); } // Write the pending requests long currentTime = System.currentTimeMillis(); flush(currentTime); // And manage removed sessions nSessions -= removeSessions();进入handleNewSessions:
private int handleNewSessions() { int addedSessions = 0; for (S session = newSessions.poll(); session != null; session = newSessions.poll()) { if (addNow(session)) { // A new session has been created addedSessions++; } } return addedSessions; }可以看到,这里是在不停的将newSession中的session对象poll出,然后调用addNow对session中的socketchannel进行read操作的注册,同时将用户自定义的filter插入到session的filterchain当中。进入addNow方法:
相关推荐
Mina2.0 快速入门与源码剖析 Mina2.0 是一个基于 Java 的网络应用框架,提供了一个高效、可扩展的网络通信解决方案。下面是 Mina2.0 快速入门与源码剖析的知识点总结: 一、Mina2.0 快速入门 Mina2.0 的快速入门...
Mina2.0快速入门与源码剖析 Mina2.0是一个基于Java的开源网络应用框架,主要用于构建高性能、可扩展的网络应用程序。本文将对Mina2.0进行快速入门和源码剖析,帮助读者快速了解Mina2.0的基本概念和使用方法。 Mina...
《Mina2.0框架源码剖析》 Apache Mina是一个高性能、轻量级的网络通信框架,常用于构建基于TCP/IP和UDP/IP协议的应用,如服务器端的开发。Mina2.0作为其一个重要版本,引入了许多优化和改进,为开发者提供了更强大...
Mina2.0框架源码剖析 Mina2.0是一个基于Java的网络应用框架,提供了一个简洁、灵活的API,帮助开发者快速构建高性能的网络应用程序。下面是Mina2.0框架源码剖析的相关知识点: 一、Mina2.0框架概述 Mina2.0是一个...
### Mina2.0框架源码剖析 #### 引言 Mina是Apache组织下的一个高性能网络通信框架,主要用于帮助开发者构建可靠且高效的网络应用程序。它通过提供一套抽象的、事件驱动的异步API,使得Java NIO在处理TCP/IP、UDP/...
本文旨在深入剖析Mina2的核心部分,帮助读者更好地理解和掌握Mina2的工作原理及其在实际开发中的应用。 #### 核心包介绍 Mina2的核心组成部分主要包括以下四个包: 1. **org.apache.mina.core.service**:包含服务...
《Mina2.0框架源码剖析(六)》这篇文档主要关注的是Mina框架中的ExpiringMap、IoSession及其相关概念,这些内容对于理解Mina框架如何处理数据过期、会话管理和读写操作至关重要。 ExpiringMap是一个实现自动过期功能...
#### Mina 2.0 框架源码剖析 接下来,我们将逐步深入分析 Mina 2.0 的核心组件及其实现原理。 **2.1 IoAcceptor** `IoAcceptor` 接口是 Mina 2.0 中的核心接口之一,它代表了一个网络服务端点。通过该接口可以...
自己整理的一些mina学习资料,内含MINA官方教程(中文版).docx,MINA-2.0.0-M4.chm(英文版),Apache_Mina_Server_2.0中文参考手册V1.0.pdf, 还有mina的包
Mina2框架源码剖析是 Java 编程语言中一个流行的网络编程框架,用于构建高性能、可扩展的网络应用程序。该框架核心包包括 org.apache.mina.core.service, org.apache.mina.core.session, org.apache.mina.core....
3. **Mina2源码分析**(Mina2源码分析.doc):源码分析文档通常由经验丰富的开发者编写,通过深入剖析MINA的源代码,揭示其内部工作原理,帮助开发者理解MINA如何实现非阻塞I/O,以及如何高效地处理网络连接和数据...
资源包括: MINA笔记.docx ...Mina2.0快速入门与源码剖析.pdf MINA网络框架和RMI的对比研究.pdf 基于3G网络的移动流媒体服务器的设计与实现.pdf 高性能通信框架及智能主站技术研究.nh MINA类图.doc 等
最近一直在看 Mina 的源码,用了 Mina 这么长时间,说实话,现在才开始对 Mina 有了一 些 深刻的理解,关于 Mina 的基本知识的介绍,这里就不多说了,网上已经有很多不错的文 章 都对 Mina 做了较深刻的剖析,现在...
“源码剖析”这部分可能会深入MINA的内部实现,讲解核心组件的工作原理,这对于想要进行性能优化或定制化开发的开发者来说是非常宝贵的。源码分析可以帮助开发者理解MINA如何处理网络事件,如何实现异步I/O,以及...