`

回归java,学习下mina,后续慢慢完善哈。

 
阅读更多

 

 一、怎么看mina的开源代码

1、首先要明白,一个代码、程序、或者框架,是用来解决用户实在的问题。而mina是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架
2、所有,针对mina,关键就是高性能、高伸缩、网络应用。所有,看mina源码就带着几个问题去看:
        a、什么是网络编程,这个就需要了解socket、io、nio的知识。
        b、高性能,在高并发的情况下高吞吐量,针对这种并发编程,mina用到了并发编程框架,所有要了解concurrent相关知识。
        c、高伸缩性,就是框架在设计上,在架构上,在模式的运用上设计合理,这里就是要学习怎么样才能架构出高伸缩性的架构。
 
二、mina 入门《mina,hello world》和整体通信过程
这里默认你已经准备好了Eclipse和java等必要的开发环境。
新建一个project工程,导入mina必要包:mina-core等。
,最懒的方式就是讲lib包和dist包下的jar全部依赖在工程中。
代码参考:http://www.cnblogs.com/xuekyo/archive/2013/03/06/2945826.html
创建服务端MinaTimeServer
  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.charset.Charset;
  4. import org.apache.mina.core.service.IoAcceptor;
  5. import org.apache.mina.core.session.IdleStatus;
  6. import org.apache.mina.filter.codec.ProtocolCodecFilter;
  7. import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
  8. import org.apache.mina.filter.logging.LoggingFilter;
  9. import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
  10. publicclassMinaTimeServer{
  11. // 定义监听端口
  12. privatestaticfinalint PORT =6488;
  13. publicstaticvoid main(String[] args)throwsIOException{
  14. // 创建服务端监控线程
  15. IoAcceptor acceptor =newNioSocketAcceptor();
  16. acceptor.getSessionConfig().setReadBufferSize(2048);
  17. acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,10);
  18. // 设置日志记录器
  19. acceptor.getFilterChain().addLast("logger",newLoggingFilter());
  20. // 设置编码过滤器
  21. acceptor.getFilterChain().addLast(
  22. "codec",
  23. newProtocolCodecFilter(newTextLineCodecFactory(Charset.forName("UTF-8"))));
  24. // 指定业务逻辑处理器
  25. acceptor.setHandler(newTimeServerHandler());
  26. // 设置端口号
  27. acceptor.bind(newInetSocketAddress(PORT));
  28. // 启动监听线程
  29. acceptor.bind();
  30. }
  31. }
编写服务端handler
  1. import org.apache.mina.core.service.IoHandlerAdapter;
  2. import org.apache.mina.core.session.IdleStatus;
  3. import org.apache.mina.core.session.IoSession;
  4. /**
  5. * 服务器端业务逻辑
  6. */
  7. publicclassTimeServerHandlerextendsIoHandlerAdapter{
  8. /**
  9. * 连接创建事件
  10. */
  11. @Override
  12. publicvoid sessionCreated(IoSession session){
  13. // 显示客户端的ip和端口
  14. System.out.println(session.getRemoteAddress().toString());
  15. }
  16. @Override
  17. publicvoid exceptionCaught(IoSession session,Throwable cause)throwsException{
  18. cause.printStackTrace();
  19. }
  20. /**
  21. * 消息接收事件
  22. */
  23. @Override
  24. publicvoid messageReceived(IoSession session,Object message)throwsException{
  25. String strMsg = message.toString();
  26. if(strMsg.trim().equalsIgnoreCase("quit")){
  27. session.close(true);
  28. return;
  29. }
  30. // 返回消息字符串
  31. session.write("Hi Client!");
  32. // 打印客户端传来的消息内容
  33. System.out.println("Message written : "+ strMsg);
  34. }
  35. @Override
  36. publicvoid sessionIdle(IoSession session,IdleStatus status)throwsException{
  37. System.out.println("IDLE"+ session.getIdleCount(status));
  38. }
  39. }
构建客户端
  1. import java.net.InetSocketAddress;
  2. import java.nio.charset.Charset;
  3. import org.apache.mina.core.future.ConnectFuture;
  4. import org.apache.mina.filter.codec.ProtocolCodecFilter;
  5. import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
  6. import org.apache.mina.filter.logging.LoggingFilter;
  7. import org.apache.mina.transport.socket.nio.NioSocketConnector;
  8. publicclassMinaTimeClient{
  9. publicstaticvoid main(String[] args){
  10. // 创建客户端连接器.
  11. NioSocketConnector connector =newNioSocketConnector();
  12. connector.getFilterChain().addLast("logger",newLoggingFilter());
  13. connector.getFilterChain().addLast("codec",
  14. newProtocolCodecFilter(newTextLineCodecFactory(Charset.forName("UTF-8"))));
  15. // 设置连接超时检查时间
  16. connector.setConnectTimeoutCheckInterval(30);
  17. connector.setHandler(newTimeClientHandler());
  18. // 建立连接
  19. ConnectFuture cf = connector.connect(newInetSocketAddress("192.168.2.109",6488));
  20. // 等待连接创建完成
  21. cf.awaitUninterruptibly();
  22. cf.getSession().write("Hi Server!");
  23. cf.getSession().write("quit");
  24. // 等待连接断开
  25. cf.getSession().getCloseFuture().awaitUninterruptibly();
  26. // 释放连接
  27. connector.dispose();
  28. }
  29. }
 
编写客户端handler
  1. import org.apache.mina.core.service.IoHandlerAdapter;
  2. import org.apache.mina.core.session.IoSession;
  3. publicclassTimeClientHandlerextendsIoHandlerAdapter{
  4. publicvoid messageReceived(IoSession session,Object message)throwsException{
  5. String content = message.toString();
  6. System.out.println("client receive a message is : "+ content);
  7. }
  8. publicvoid messageSent(IoSession session,Object message)throwsException{
  9. System.out.println("messageSent -> :"+ message);
  10. }
  11. }
 
主要接口体现


 
 
先说明下主要接口和包结构

Mina包的简介:

org.apache.mina.core.buffer 用于缓冲区的IoBuffer
org.apache.mina.core.service
org.apache.mina.transport.*
用于提供连接的service
org.apache.mina.core.session 用于提供两端状态的session
org.apache.mina.core.filterchain
org.apache.mina.filter.*
用于拦截所有IO事件和请求的filter chain和各类拦截器(在IoService和IoHandler之间)
org.apache.mina.handler.* 用于处理IO事件的handler
org.apache.mina.core.future 用于实现异步IO操作的 future
org.apache.mina.core.polling 用于实现IO轮询的的polling
org.apache.mina.proxy.* 用于实现代理的proxy

先介绍Mina几个重要接口:

  • IoServiece :这个接口在一个线程上负责套接字的建立,拥有自己的 Selector,监听是否有连接被建立。
  • IoProcessor :这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是说它也拥有自己的 Selector,这是与我们使用 JAVA NIO 编码时的一个不同之处,通常在 JAVA NIO 编码中,我们都是使用一个 Selector,也就是不区分 IoService与 IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler。  
  • IoAccepter :相当于网络应用程序中的服务器端
  • IoConnector :相当于客户端
  • IoSession :当前客户端到服务器端的一个连接实例
  • IoHandler :这个接口负责编写业务逻辑,也就是接收、发送数据的地方。这也是实际开发过程中需要用户自己编写的部分代码。
  • IoFilter :过滤器用于悬接通讯层接口与业务层接口,这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的 encode与 decode是最为重要的、也是你在使用 Mina时最主要关注的地方。
 
 
这里需要了解的就是mina如何建立客户端连接请求和与客户端的数据交互 来了解各个接口的作用。
1、建立客户端请求:
我们看到,我们的MinaTimeServer中,new 了一个NioSocketAcceptor,我们来看看,这里发生了什么,先抛出,NioSocketAcceptor的相关继承关系如下:
 所有,我们来先看看,new NioSocketAcceptor都做了什么事情
1、调用NioSocketAcceptor的构造函数,其实是调用了AbstractPollingIoAcceptor
  1. publicNioSocketAcceptor(){
  2. super(newDefaultSocketSessionConfig(),NioProcessor.class);
  3. ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
  4. }
2、那我看下,AbstractPollingIoAcceptor又调用了super(),就是AbstartIoAcceptor,其中这个sessionConfig是默认实现,executor是null
  1. privateAbstractPollingIoAcceptor(IoSessionConfig sessionConfig,Executor executor,IoProcessor<S> processor,
  2. boolean createdProcessor,SelectorProvider selectorProvider){
  3. super(sessionConfig, executor);
  4. if(processor ==null){
  5. thrownewIllegalArgumentException("processor");
  6. }
  7. this.processor = processor;
  8. this.createdProcessor = createdProcessor;
  9. try{
  10. // Initialize the selector
  11. init(selectorProvider);//初始化selector,也就是selector.open();
  12. // The selector is now ready, we can switch the
  13. // flag to true so that incoming connection can be accepted
  14. selectable =true;
  15. }catch(RuntimeException e){
  16. throw e;
  17. }catch(Exception e){
  18. thrownewRuntimeIoException("Failed to initialize.", e);
  19. }finally{
  20. if(!selectable){
  21. try{
  22. destroy();
  23. }catch(Exception e){
  24. ExceptionMonitor.getInstance().exceptionCaught(e);
  25. }
  26. }
  27. }
  28. }
2.1、这里new的时候,生产了一个SimpleProcessor的实例,并告知已经创建了processor了。
3、我们在继续看,这个AbstartIoAcceptor继续调用了AbstactIoService(),主要的工作就是初始化下listener,生成excutor为newCachedThreadPool模式,此对象还包括了filterChainBuilder和sessionDataStructureFactory
  1. protectedAbstractIoService(IoSessionConfig sessionConfig,Executor executor){
  2. if(sessionConfig ==null){
  3. thrownewIllegalArgumentException("sessionConfig");
  4. }
  5. if(getTransportMetadata()==null){
  6. thrownewIllegalArgumentException("TransportMetadata");
  7. }
  8. if(!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())){
  9. thrownewIllegalArgumentException("sessionConfig type: "+ sessionConfig.getClass()+" (expected: "
  10. + getTransportMetadata().getSessionConfigType()+")");
  11. }
  12. // Create the listeners, and add a first listener : a activation listener
  13. // for this service, which will give information on the service state.
  14. listeners =newIoServiceListenerSupport(this);
  15. listeners.add(serviceActivationListener);
  16. // Stores the given session configuration
  17. this.sessionConfig = sessionConfig;
  18. // Make JVM load the exception monitor before some transports
  19. // change the thread context class loader.
  20. ExceptionMonitor.getInstance();
  21. if(executor ==null){
  22. this.executor =Executors.newCachedThreadPool();
  23. createdExecutor =true;
  24. }else{
  25. this.executor = executor;
  26. createdExecutor =false;
  27. }
  28. threadName = getClass().getSimpleName()+'-'+ id.incrementAndGet();
  29. }
至此,new NioSocketAcceptor实例过程完成,看起来只是初始化了下如下必要元素
selector,processor,listenList,IofilterChainBuilder,和线程相关的线程池execute
那我们继续看,我们的MinaTimeServer 设置了iofilter和业务处理handler。接下来调用了bind方法。其实是调用了AbstractIoAcceptor的bind,方法里面调用了AbstactPollingIoAcceptor中的bindInternal.
  1. publicfinalvoid bind(Iterable<?extendsSocketAddress> localAddresses)throwsIOException{
  2. if(isDisposing()){
  3. thrownewIllegalStateException("The Accpetor disposed is being disposed.");
  4. }
  5. if(localAddresses ==null){
  6. thrownewIllegalArgumentException("localAddresses");
  7. }
  8. List<SocketAddress> localAddressesCopy =newArrayList<SocketAddress>();
  9. for(SocketAddress a : localAddresses){
  10. checkAddressType(a);
  11. localAddressesCopy.add(a);
  12. }
  13. if(localAddressesCopy.isEmpty()){
  14. thrownewIllegalArgumentException("localAddresses is empty.");
  15. }
  16. boolean activate =false;
  17. synchronized(bindLock){
  18. synchronized(boundAddresses){
  19. if(boundAddresses.isEmpty()){
  20. activate =true;
  21. }
  22. }
  23. if(getHandler()==null){
  24. thrownewIllegalStateException("handler is not set.");
  25. }
  26. try{
  27. Set<SocketAddress> addresses = bindInternal(localAddressesCopy);//
  28. synchronized(boundAddresses){
  29. boundAddresses.addAll(addresses);
  30. }
  31. }catch(IOException e){
  32. throw e;
  33. }catch(RuntimeException e){
  34. throw e;
  35. }catch(Exception e){
  36. thrownewRuntimeIoException("Failed to bind to: "+ getLocalAddresses(), e);
  37. }
  38. }
  39. if(activate){
  40. getListeners().fireServiceActivated();
  41. }
  42. }
bindInternal,这里研究下future,这个current包中的模式。。
  1. @Override
  2. protectedfinalSet<SocketAddress> bindInternal(List<?extendsSocketAddress> localAddresses)throwsException{
  3. //创建一个future并放在注册队列中,当selector处理完后,将通知这个future做其他事情
  4. AcceptorOperationFuture request =newAcceptorOperationFuture(localAddresses);
  5. registerQueue.add(request);
  6. // 创建一个Acceptor(内部类),并放在execute中启动监听线程。
  7. startupAcceptor();
  8. // 为了处理刚才这个绑定请求,我们暂且不阻塞selector方法
  9. //As we just started the acceptor, we have to unblock the select()
  10. // in order to process the bind request we just have added to the
  11. // registerQueue.
  12. try{
  13. lock.acquire();
  14. // Wait a bit to give a chance to the Acceptor thread to do the select()
  15. Thread.sleep(10);
  16. wakeup();
  17. }finally{
  18. lock.release();
  19. }
  20. // Now, we wait until this request is completed.
  21. request.awaitUninterruptibly();
  22. if(request.getException()!=null){
  23. throw request.getException();
  24. }
  25. // Update the local addresses.
  26. // setLocalAddresses() shouldn't be called from the worker thread
  27. // because of deadlock.
  28. Set<SocketAddress> newLocalAddresses =newHashSet<SocketAddress>();
  29. for(H handle : boundHandles.values()){
  30. newLocalAddresses.add(localAddress(handle));
  31. }
  32. return newLocalAddresses;
  33. }
startupAcceptor
  1. privatevoid startupAcceptor()throwsInterruptedException{
  2. // If the acceptor is not ready, clear the queues
  3. // TODO : they should already be clean : do we have to do that ?
  4. if(!selectable){
  5. registerQueue.clear();
  6. cancelQueue.clear();
  7. }
  8. // start the acceptor if not already started
  9. Acceptor acceptor = acceptorRef.get();//获取线程中是否有Acceptor了。
  10. if(acceptor ==null){//没有,则实例化一个,并在exeutrWorker中启动,调用他的run方法
  11. lock.acquire();
  12. acceptor =newAcceptor();
  13. if(acceptorRef.compareAndSet(null, acceptor)){
  14. executeWorker(acceptor);
  15. }else{
  16. lock.release();
  17. }
  18. }
  19. }
我们继续看看Acceptor的run方法,其实就是开始执行select()阻塞方法了。
  1. publicvoid run(){
  2. assert(acceptorRef.get()==this);
  3. int nHandles =0;
  4. // Release the lock
  5. lock.release();
  6. while(selectable){
  7. try{
  8. // Detect if we have some keys ready to be processed
  9. // The select() will be woke up if some new connection
  10. // have occurred, or if the selector has been explicitly
  11. // woke up
  12. int selected = select();
  13. // this actually sets the selector to OP_ACCEPT,
  14. // and binds to the port on which this class will
  15. // listen on
  16. nHandles += registerHandles();
  17. // Now, if the number of registred handles is 0, we can
  18. // quit the loop: we don't have any socket listening
  19. // for incoming connection.
  20. if(nHandles ==0){
  21. acceptorRef.set(null);
  22. if(registerQueue.isEmpty()&& cancelQueue.isEmpty()){
  23. assert(acceptorRef.get()!=this);
  24. break;
  25. }
  26. if(!acceptorRef.compareAndSet(null,this)){
  27. assert(acceptorRef.get()!=this);
  28. break;
  29. }
  30. assert(acceptorRef.get()==this);
  31. }
  32. if(selected >0){
  33. // We have some connection request, let's process
  34. // them here.
  35. processHandles(selectedHandles());
  36. }
  37. // check to see if any cancellation request has been made.
  38. nHandles -= unregisterHandles();
  39. }catch(ClosedSelectorException cse){
  40. // If the selector has been closed, we can exit the loop
  41. ExceptionMonitor.getInstance().exceptionCaught(cse);
  42. break;
  43. }catch(Exception e){
  44. ExceptionMonitor.getInstance().exceptionCaught(e);
  45. try{
  46. Thread.sleep(1000);
  47. }catch(InterruptedException e1){
  48. ExceptionMonitor.getInstance().exceptionCaught(e1);
  49. }
  50. }
  51. }
  52. // Cleanup all the processors, and shutdown the acceptor.
  53. if(selectable && isDisposing()){
  54. selectable =false;
  55. try{
  56. if(createdProcessor){
  57. processor.dispose();
  58. }
  59. }finally{
  60. try{
  61. synchronized(disposalLock){
  62. if(isDisposing()){
  63. destroy();
  64. }
  65. }
  66. }catch(Exception e){
  67. ExceptionMonitor.getInstance().exceptionCaught(e);
  68. }finally{
  69. disposalFuture.setDone();
  70. }
  71. }
  72. }
  73. }
其中registerHandler就是实力ServerSocket的端口监听,能看到我们熟悉的NIO代码端口bind和 channel.register(selector, SelectionKey.OP_ACCEPT);
,在registerHandler调用了NioSocketAcceptor中的open
  1. @Override
  2. protectedServerSocketChannel open(SocketAddress localAddress)throwsException{
  3. // Creates the listening ServerSocket
  4. ServerSocketChannel channel =null;
  5. if(selectorProvider !=null){
  6. channel = selectorProvider.openServerSocketChannel();
  7. }else{
  8. channel =ServerSocketChannel.open();
  9. }
  10. boolean success =false;
  11. try{
  12. // This is a non blocking socket channel
  13. channel.configureBlocking(false);
  14. // Configure the server socket,
  15. ServerSocket socket = channel.socket();
  16. // Set the reuseAddress flag accordingly with the setting
  17. socket.setReuseAddress(isReuseAddress());
  18. // and bind.
  19. try{
  20. socket.bind(localAddress, getBacklog());
  21. }catch(IOException ioe){
  22. // Add some info regarding the address we try to bind to the
  23. // message
  24. String newMessage ="Error while binding on "+ localAddress +"\n"+"original message : "
  25. + ioe.getMessage();
  26. Exception e =newIOException(newMessage);
  27. e.initCause(ioe.getCause());
  28. // And close the channel
  29. channel.close();
  30. throw e;
  31. }
  32. // Register the channel within the selector for ACCEPT event
  33. channel.register(selector,SelectionKey.OP_ACCEPT);
  34. success =true;
  35. }finally{
  36. if(!success){
  37. close(channel);
  38. }
  39. }
  40. return channel;
  41. }
注册后,那我们就等待客户端的连接吧,如果客户端执行了socket.connect(iPaddr),那我们这里的selector.select()会>0.此时会处理这个客户端的请求Acceptor.processHandles(new ServerSocketChannelIterator(selector.selectedKeys()));是不是感觉这个代码在自己编写的NIO代码中很类似?这里,accept()方法,将processor和这个ServerSocketChannel作处理,其实是生产一个session,并将socketChannel,processor等做关联。并初始化和准备一个客户端和服务端连接和交互过程整个生命周期的session了。最后session.getProcessor.add(session)很隐晦的开始了processor的流程。。
  1. privatevoid processHandles(Iterator<H> handles)throwsException{
  2. while(handles.hasNext()){
  3. H handle = handles.next();
  4. handles.remove();
  5. // Associates a new created connection to a processor,
  6. // and get back a session
  7. S session = accept(processor, handle);
  8. if(session ==null){
  9. continue;
  10. }
  11. initSession(session,null,null);
  12. // add the session to the SocketIoProcessor
  13. session.getProcessor().add(session);
  14. }
  15. }
我们来看看NioSocketAcceptor.accept()
  1. protectedNioSession accept(IoProcessor<NioSession> processor,ServerSocketChannel handle)throwsException{
  2. SelectionKey key =null;
  3. if(handle !=null){
  4. key = handle.keyFor(selector);
  5. }
  6. if((key ==null)||(!key.isValid())||(!key.isAcceptable())){
  7. returnnull;
  8. }
  9. // accept the connection from the client
  10. SocketChannel ch = handle.accept();
  11. if(ch ==null){
  12. returnnull;
  13. }
  14. returnnewNioSocketSession(this, processor, ch);
  15. }
刚我们说到很隐晦的session.getProcessor.add(session)。为什么说很隐晦,应该这里正是开始了processor处理这个session后续请求的。上面在实例化的时候,实例的是SimpleProcessor首先看看SimpleProcessor.add(){getProcessor.add()}//整个就是多个processor的体现
  1. @SuppressWarnings("unchecked")
  2. privateIoProcessor<S> getProcessor(S session){
  3. IoProcessor<S> processor =(IoProcessor<S>) session.getAttribute(PROCESSOR);
  4. if(processor ==null){
  5. if(disposed || disposing){
  6. thrownewIllegalStateException("A disposed processor cannot be accessed.");
  7. }
  8. processor = pool[Math.abs((int) session.getId())% pool.length];
  9. if(processor ==null){
  10. thrownewIllegalStateException("A disposed processor cannot be accessed.");
  11. }
  12. session.setAttributeIfAbsent(PROCESSOR, processor);
  13. }
  14. return processor;
  15. }
 
 
 
AbstractPollingIoProcessor.add()
  1. publicfinalvoid add(S session){
  2. if(disposed || disposing){
  3. thrownewIllegalStateException("Already disposed.");
  4. }
  5. // Adds the session to the newSession queue and starts the worker
  6. newSessions.add(session);
  7. startupProcessor();
  8. }
  1. privatevoid startupProcessor(){
  2. Processor processor = processorRef.get();
  3. if(processor ==null){
  4. processor =newProcessor();
  5. if(processorRef.compareAndSet(null, processor)){
  6. executor.execute(newNamePreservingRunnable(processor, threadName));
  7. }
  8. }
  9. // 注意这个wakeup。
  10. wakeup();
  11. }
  1. publicvoid run(){
  2. assert(processorRef.get()==this);
  3. int nSessions =0;
  4. lastIdleCheckTime =System.currentTimeMillis();
  5. for(;;){
  6. try{
  7. // This select has a timeout so that we can manage
  8. // idle session when we get out of the select every
  9. // second. (note : this is a hack to avoid creating
  10. // a dedicated thread).
  11. long t0 =System.currentTimeMillis();
  12. int selected = select(SELECT_TIMEOUT);
  13. long t1 =System.currentTimeMillis();
  14. long delta =(t1 - t0);
  15. if(!wakeupCalled.getAndSet(false)&&(selected ==0)&&(delta <100)){
  16. // Last chance : the select() may have been
  17. // interrupted because we have had an closed channel.
  18. if(isBrokenConnection()){
  19. LOG.warn("Broken connection");
  20. }else{
  21. LOG.warn("Create a new selector. Selected is 0, delta = "+(t1 - t0));
  22. // 这里搞不太懂为什么要注册过新的。
  23. registerNewSelector();
  24. }
  25. }
  26. // 先处理这个心的sessoin。其实就是做了socket在processor线程上的selector的注册
  27. nSessions += handleNewSessions();
  28. updateTrafficMask();
  29. // Now, if we have had some incoming or outgoing events,
  30. // deal with them
  31. if(selected >0){
  32. // LOG.debug("Processing ..."); // This log hurts one of
  33. // the MDCFilter test...
  34. process();
  35. }
  36. // Write the pending requests
  37. long currentTime =System.currentTimeMillis();
  38. flush(currentTime);
  39. // And manage removed sessions
  40. nSessions -= removeSessions();
  41. // Last, not least, send Idle events to the idle sessions
  42. notifyIdleSessions(currentTime);
  43. // Get a chance to exit the infinite loop if there are no
  44. // more sessions on this Processor
  45. if(nSessions ==0){
  46. processorRef.set(null);
  47. if(newSessions.isEmpty()&& isSelectorEmpty()){
  48. // newSessions.add() precedes startupProcessor
  49. assert(processorRef.get()!=this);
  50. break;
  51. }
  52. assert(processorRef.get()!=this);
  53. if(!processorRef.compareAndSet(null,this)){
  54. // startupProcessor won race, so must exit processor
  55. assert(processorRef.get()!=this);
  56. break;
  57. }
  58. assert(processorRef.get()==this);
  59. }
  60. // Disconnect all sessions immediately if disposal has been
  61. // requested so that we exit this loop eventually.
  62. if(isDisposing()){
  63. boolean hasKeys =false;
  64. for(Iterator<S> i = allSessions(); i.hasNext();){
  65. IoSession session = i.next();
  66. if(session.isActive()){
  67. scheduleRemove((S)session);
  68. hasKeys =true;
  69. }
  70. }
  71. if(hasKeys){
  72. wakeup();
  73. }
  74. }
  75. }catch(ClosedSelectorException cse){
  76. // If the selector has been closed, we can exit the loop
  77. // But first, dump a stack trace
  78. ExceptionMonitor.getInstance().exceptionCaught(cse);
  79. break;
  80. }catch(Exception e){
  81. ExceptionMonitor.getInstance().exceptionCaught(e);
  82. try{
  83. Thread.sleep(1000);
  84. }catch(InterruptedException e1){
  85. ExceptionMonitor.getInstance().exceptionCaught(e1);
  86. }
  87. }
  88. }
  89. try{
  90. synchronized(disposalLock){
  91. if(disposing){
  92. doDispose();
  93. }
  94. }
  95. }catch(Exception e){
  96. ExceptionMonitor.getInstance().exceptionCaught(e);
  97. }finally{
  98. disposalFuture.setValue(true);
  99. }
  100. }
handlerNewSession做了几件事情,
1、将这个socketChannel注册在processor线程上的selector上,具体看NioProcessor的init().
2、构建iofilter的chain,并发出session被创建的事件。
 
很快,我们就能看到,一个请求过来后,如何读写的了。这里就是靠processor的中的process(),很熟悉。哈哈
  1. privatevoid process(S session){
  2. // Process Reads
  3. if(isReadable(session)&&!session.isReadSuspended()){
  4. read(session);
  5. }
  6. // Process writes
  7. if(isWritable(session)&&!session.isWriteSuspended()){
  8. // add the session to the queue, if it's not already there
  9. if(session.setScheduledForFlush(true)){
  10. flushingSessions.add(session);
  11. }
  12. }
  13. }
那我们最后看看read()
  1. privatevoid read(S session){
  2. IoSessionConfig config = session.getConfig();
  3. int bufferSize = config.getReadBufferSize();
  4. IoBuffer buf =IoBuffer.allocate(bufferSize);
  5. finalboolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
  6. try{
  7. int readBytes =0;
  8. int ret;
  9. try{
  10. if(hasFragmentation){
  11. while((ret = read(session, buf))>0){
  12. readBytes += ret;
  13. if(!buf.hasRemaining()){
  14. break;
  15. }
  16. }
  17. }else{
  18. ret = read(session, buf);
  19. if(ret >0){
  20. readBytes = ret;
  21. }
  22. }
  23. }finally{
  24. buf.flip();
  25. }
  26. if(readBytes >0){
  27. IoFilterChain filterChain = session.getFilterChain();
  28. filterChain.fireMessageReceived(buf);
  29. buf =null;
  30. if(hasFragmentation){
  31. if(readBytes <<1< config.getReadBufferSize()){
  32. session.decreaseReadBufferSize();
  33. }elseif(readBytes == config.getReadBufferSize()){
  34. session.increaseReadBufferSize();
  35. }
  36. }
  37. }
  38. if(ret <0){
  39. // scheduleRemove(session);
  40. IoFilterChain filterChain = session.getFilterChain();
  41. filterChain.fireInputClosed();
  42. }
  43. }catch(Exception e){
  44. if(e instanceofIOException){
  45. if(!(e instanceofPortUnreachableException)
  46. ||!AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
  47. ||((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()){
  48. scheduleRemove(session);
  49. }
  50. }
  51. IoFilterChain filterChain = session.getFilterChain();
  52. filterChain.fireExceptionCaught(e);
  53. }
  54. }
除了读写Iobuffer外,filterChain.fireMessageReceived(buf);这里就到了我们一序列的iofitler,并在这个chain中,有一个tailFilter,这个TailFilter就是调用了我们的自己编写的Handler了。
整个流程的代码就这么。很多细节后续再补充,如:IoBuffer读写;穿插在整个系统中的事件。
 
 
 

二、mina 如何使用nio
    1、nio要素:selector,buffer,channel,这里推荐一个文章,链接为:
 
三、mina 如何使用concurrent
    这个也是我们学习mina中能对concurrent中的一些应用做 个了解的。
四、mina 主要UML图和
 
五、mina 总结
  主要接口:IoService,包括了IoAcceptor,IoProcessor和在客户端使用的Ioconnector
                    IoFilter,很好第体现了面向对象中的开闭原则(软件实体应当对扩展开放,对修改关闭),对于整个处理流程的修改是关闭,在对于个性化的处理流程中用户需要关心的业务相关的拓展进行了开放。
                    IOSession,讲一个连接封装成session,后续用到的请求数据,处理器,相关配置等关联到这里。理解了session的整个生命周期,就明白了Acceptor,processor的是怎么分别处理客户端连接请求和数据处理过程了。
                    IoHandler,业务处理的中心,用户根据自己的业务需求,在这里编写业务代码
                    IO
                    
  • 大小: 37.9 KB
分享到:
评论

相关推荐

    在Java中运用mina来实现TCP通信

    这是一个有关Mina在Java通信中运用的简单的入门实例,MIna自带一种触发机制,无需再开线程等待收发数据。这个实例中有客户端和服务端,与软件TCPUDPDbg进行文字通话测试。用的时候注意添加Mina包,此实例中用到的是...

    java-mina通信框架详解.docx

    Mina的核心特性是其事件驱动、异步(基于Java NIO)的编程模型,使得处理网络通信变得更加高效。 Mina分为1.x和2.x两个主要分支,推荐使用最新的2.0版本。框架中包含了Server和Client的封装,简化了网络通信结构。...

    websocket+java服务器(mina)

    Mina(Java Multithreaded Network Application Framework)是一个用Java编写的网络应用框架,它提供了高度可扩展性和性能,适用于多种网络协议,包括TCP和UDP。Mina为开发者提供了一种抽象层,简化了网络编程的复杂...

    高性能Java网络框架 MINA

    6. **版本稳定**:MINA的版本号为2.0.5,这意味着它经过了多次迭代和优化,稳定性较好,同时也拥有较为完善的API文档和社区支持。 7. **扩展性**:MINA设计灵活,允许开发者轻松地扩展框架以适应特定需求。例如,...

    java 实现的mina server client完全能用的

    Java Mina是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“java 实现的mina server client完全能用的”项目可能包含了一个完整的Mina服务器和客户端实现,使得...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...

    java mina组合包

    在“java mina组合包”的“libs”目录下,通常会包含Mina的JAR文件和其他依赖库,如Apache Commons Logging、Netty等。开发者可以直接将这些库导入到项目中,以快速搭建基于Mina的网络应用。 总结来说,Java Mina是...

    Java学习之IO总结及mina和netty

    在本文中,我们将深入探讨Java IO,并结合MINA和Netty这两个流行的网络编程框架进行分析。 首先,Java IO提供了丰富的类库,允许程序进行数据的读写操作,包括文件操作、流处理、序列化等。它基于流的概念,分为...

    TestMINA.zip_DEMO_Mina框架_java mina_mina_mina java

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高度可扩展且高性能的...通过深入学习和实践MINA,开发者可以构建出更高效、更稳定的网络服务,尤其是在需要处理大量并发连接的场景下。

    高性能Java网络框架 MINA.7z

    MINA由Apache软件基金会开发,并且是其顶级项目之一,它主要应用于Java平台上,但也可以通过JNA(Java Native Access)在其他平台上运行。 MINA的核心特性包括: 1. **异步I/O模型**:MINA基于NIO(Non-blocking I...

    Android Java Socket框架 Mina2.0

    根目录下的《Mina2.0学习笔记》应该包含了详细的教程和示例,涵盖了Mina的基本概念、配置、过滤器使用、协议处理等方面,是学习和理解Mina框架的重要参考资料。 总之,Android Java Socket框架Mina2.0提供了一个...

    java mina框架全套

    学习和使用Mina框架,可以帮助开发者快速构建稳定、高效的网络应用,降低网络编程的复杂度,同时充分利用Java NIO的优势。在实际项目中,结合Mina与其他开源库如Spring、Hibernate等,可以构建出更加强大的企业级...

    websocket+java服务器(mina)

    Mina(Java Network Application Platform)是一个网络通信框架,它简化了开发高性能、高可用性的网络应用程序的过程。Mina提供了异步的事件驱动模型,可以处理大量的并发连接,并且支持多种协议,包括TCP、UDP以及...

    JAVA mina 框架源码

    JAVA Mina框架是一款高度可扩展、高性能的网络应用开发框架,专为Java平台设计。它提供了丰富的网络通信API,使得开发者能够轻松地构建基于TCP/IP、UDP/IP以及其他协议的服务器和客户端应用程序。Mina框架的核心设计...

    java mina 通讯框架

    java mina 通讯框架

    Java mina2源码

    Java Mina2是一个高度可扩展且高性能的网络通信框架,主要用在开发基于TCP、UDP等协议的服务端应用。它提供了简单而强大的API,使得开发者能够轻松构建网络应用程序,如服务器端的聊天室、游戏服务器或者任何需要...

    mina学习资料--很实用

    Apache Mina是一个开源框架,主要用于构建高性能、高可用性的网络应用程序。它主要关注网络通信的I/O层...通过深入学习和实践这些知识,你将能够利用Mina框架构建出高效、稳定的网络服务,应对高并发场景下的通信挑战。

    JAVA 通信框架MINA(包含心跳)

    Apache MINA 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 可以作为开发网络应用...

Global site tag (gtag.js) - Google Analytics