- 浏览: 980015 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Mina Io监听器接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378315
引言:
IoAcceptor与IoService不同的是,添加了监听连接请求和地址绑定功能。抽象Io监听器AbstractIoAcceptor绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。解绑地址方法,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。
来看AbstractPollingIoAcceptor的构造:
从上面来看,构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。
再来看其他方法:
再来绑定socket地址:
地址绑定有一点要关注:
我们再来看监听器Acceptor的定义:
从Acceptor的源码可以看出,如果监听器AbstractPollingIoAcceptor已经初始化,首先
根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。
回到地址绑定方法:
从上面来看,地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,
创建监听器Acceptor实例并执行。
再来看地址解绑等操作:
总结:
AbstractPollingIoAcceptor主要变量为Io处理器processor,地址绑定请求队列registerQueue,地址解绑请求队列cancelQueue,监听器绑定的socket地址,与ServerSocketChannel映射关系boundHandles-Map,监听工作线程Acceptor引用acceptorRef。
构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,创建监听器Acceptor实例并执行。Acceptor主要功能为,地址绑定,监听连接请求,解绑地址,实际工作逻辑为:如果监听器AbstractPollingIoAcceptor已经初始化,首先根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。Acceptor和AbstractPollingIoAcceptor的关系,与AbstractPollingIoProcessor和Processor的关系很像。地址解绑过程,首先根据解绑地址创建AcceptorOperationFuture,添加到解绑队列,启动Acceptor线程,完成实际解绑工作。
AbstractPollingIoAcceptor所有的工作(地址绑定,接收连接,创建会话,添加会话到IO处理器,解绑地址,释放监听器资源)都是在Acceptor线程里完成。
附:
//SimpleIoProcessorPool,简单的IO处理器线程池,这一步,可以做个了解。
简单的IO处理器线程池SimpleIoProcessorPool,将一个会话的相关事件在多个Io处理器执行。当前的transport内部实现用SimpleIoProcessorPool,在多处理器环境下,具有良好的性能,因此不需要直接使用SimpleIoProcessorPool,除非在同个虚拟机中运行多个IoService。
Io处理器线程池,主要是创建size个IO处理器线程处理会话和过滤器相关事件。如果在同一虚拟机中运行多个Io服务,你需要共享处理器线程池。为了达到这个效果,你需要构造
SimpleIoProcessorPool实例,在创建IO服务时,作为参数传入。会话与IO处理器线程池,主要通过会话的处理器属性关联,获取会话关联的处理器,如果会话属性没有存储关联处理器,则从处理器线程池中获取一个,添加会话属性中。SimpleIoProcessorPool的其他方法发送写请求,刷新会话,添加移除会话等操作都是通过会话关联的处理器线程池中的处理器。简单的说,SimpleIoProcessorPool使用与再同一个虚拟机下启动多个IO服务的情况,简单处理器线程池,就是一个处理器集合,添加会话时,从会话属性中,获会话关联的处理器,如果会话属性没有存储关联处理器,则从处理器线程池中获取一个,添加会话属性中。
引言:
IoAcceptor与IoService不同的是,添加了监听连接请求和地址绑定功能。抽象Io监听器AbstractIoAcceptor绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。解绑地址方法,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。
/** * A base class for implementing transport using a polling strategy. The * underlying sockets will be checked in an active loop and woke up when an * socket needed to be processed. This class handle the logic behind binding, * accepting and disposing the server sockets. An {@link Executor} will be used * for running client accepting and an {@link AbstractPollingIoProcessor} will * be used for processing client I/O operations like reading, writing and * closing. * AbstractPollingIoAcceptor为传输层拉取策略的基本实现。底层socket将在一个循环中检查,连接请求事件,当一个以个socket连接请求时,唤醒监听器线程,处理连接请求。此类处理地址绑定,接收连接请求,释放server socket。一个执行器将会运行接收客户端连接,AbstractPollingIoProcessor将用于处理客户端的IO操作事件。 * All the low level methods for binding, accepting, closing need to be provided * by the subclassing implementation. * * @see NioSocketAcceptor for a example of implementation * @param <H> The type of IoHandler * @param <S> The type of IoSession * * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor { /** A lock used to protect the selector to be waked up before it's created */ //信号量用于在选择器被创建前,保护选择器被唤醒 private final Semaphore lock = new Semaphore(1); private final IoProcessor<S> processor;//Io处理器 private final boolean createdProcessor;//Io处理器是否创建 //地址绑定请求队列,地址绑定时,添加绑定请求AcceptorOperationFuture到队列 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>(); //解绑队列,解绑地址时,添加地址解绑请求AcceptorOperationFuture到队列 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>(); //监听器绑定的socket地址,与ServerSocketChannel映射关系 //绑定地址后,添加地址与ServerSocketChannel映射关系到boundHandles private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();//Service关闭结果 /** A flag set when the acceptor has been created and initialized */ private volatile boolean selectable;//当acceptor创建和初始化设置 /** The thread responsible of accepting incoming requests 监听连接请求线程Acceptor*/ private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>(); protected boolean reuseAddress = false;//地址是否重用 /** * Define the number of socket that can wait to be accepted. Default * to 50 (as in the SocketServer default). SocketServer默认可以接收的连接数。 */ protected int backlog = 50; }
来看AbstractPollingIoAcceptor的构造:
/** * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default * session configuration, a class of {@link IoProcessor} which will be instantiated in a * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default * pool size will be used. * 构造AbstractPollingIoAcceptor,需要提供默认的会话配置,在IO简单处理器线程池当中,执行的Io处理器类型。 默认的线程池大小将会被使用 * @see SimpleIoProcessorPool * * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} * type. */ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null); } /** * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default * session configuration, a class of {@link IoProcessor} which will be instantiated in a * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor * systems. * 与上面的方法不同的时,指定Io处理器线程池大小,即Io处理器的个数。 * @see SimpleIoProcessorPool * * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} * type. * @param processorCount the amount of processor to instantiate for the pool */ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, int processorCount) { this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null); } /** * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default * session configuration, a class of {@link IoProcessor} which will be instantiated in a * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor * systems. *与上面不同的时,多个一个选择器提供者参数 * @see SimpleIoProcessorPool * * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} * type. * @param processorCount the amount of processor to instantiate for the pool * @param selectorProvider The SelectorProvider to use */ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, int processorCount, SelectorProvider selectorProvider ) { this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider); } /** * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default * session configuration, a default {@link Executor} will be created using * {@link Executors#newCachedThreadPool()}. * 此方法的Io处理器执行器,为 Executors#newCachedThreadPool * @see AbstractIoService * * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} */ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) { this(sessionConfig, null, processor, false, null); } /** * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a * default session configuration and an {@link Executor} for handling I/O * events. If a null {@link Executor} is provided, a default one will be * created using {@link Executors#newCachedThreadPool()}. * 如果线程池为空,默认为Executors#newCachedThreadPool * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) * * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param executor * the {@link Executor} used for handling asynchronous execution * of I/O events. Can be <code>null</code>. * @param processor * the {@link IoProcessor} for processing the {@link IoSession} * of this transport, triggering events to the bound * {@link IoHandler} and processing the chains of * {@link IoFilter} */ protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) { this(sessionConfig, executor, processor, false, null); } /** * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a * default session configuration and an {@link Executor} for handling I/O * events. If a null {@link Executor} is provided, a default one will be * created using {@link Executors#newCachedThreadPool()}. * 这个就是最终的构造方法了。 * @see #AbstractIoService(IoSessionConfig, Executor) * * @param sessionConfig 默认会话配置 * the default configuration for the managed {@link IoSession} * @param executor 异步IO事件执行器 * the {@link Executor} used for handling asynchronous execution * of I/O events. Can be <code>null</code>. * @param processor 会话处理器器,触发IoHander和过滤器的相关事件 * the {@link IoProcessor} for processing the {@link IoSession} * of this transport, triggering events to the bound * {@link IoHandler} and processing the chains of * {@link IoFilter} * @param createdProcessor * tagging the processor as automatically created, so it will be * automatically disposed */ private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, boolean createdProcessor, SelectorProvider selectorProvider) { super(sessionConfig, executor); if (processor == null) { throw new IllegalArgumentException("processor"); } this.processor = processor; this.createdProcessor = createdProcessor; try { // Initialize the selector init(selectorProvider);初始化选择器 // The selector is now ready, we can switch the // flag to true so that incoming connection can be accepted selectable = true; } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to initialize.", e); } finally { if (!selectable) { try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } }
/** * Initialize the polling system, will be called at construction time. * * @param selectorProvider The Selector Provider that will be used by this polling acceptor * @throws Exception any exception thrown by the underlying system calls */ protected abstract void init(SelectorProvider selectorProvider) throws Exception;
从上面来看,构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。
再来看其他方法:
/** * Initialize the polling system, will be called at construction time. * @throws Exception any exception thrown by the underlying system calls 初始化拉取系统,在构造是调用 */ protected abstract void init() throws Exception; /** * Destroy the polling system, will be called when this {@link IoAcceptor} * implementation will be disposed. 销毁拉取系统,在IO监听器销毁时,调用 * @throws Exception any exception thrown by the underlying systems calls */ protected abstract void destroy() throws Exception; /** * Check for acceptable connections, interrupt when at least a server is ready for accepting. * All the ready server socket descriptors need to be returned by {@link #selectedHandles()} 检查已接收的连接,当至少有一个准备好接收时,中断。所有的就绪serversocket描述符需要通过 #selectedHandles方法返回 * @return The number of sockets having got incoming client 接收的客户端连接数 * @throws Exception any exception thrown by the underlying systems calls */ protected abstract int select() throws Exception; /** * Interrupt the {@link #select()} method. Used when the poll set need to be modified. 唤醒,选择操作。当poll及修改时 */ protected abstract void wakeup(); /** * {@link Iterator} for the set of server sockets found with acceptable incoming connections * during the last {@link #select()} call. 返回在上一次选择操作中,服务端接收连接数 * @return the list of server handles ready */ protected abstract Iterator<H> selectedHandles(); /** * Open a server socket for a given local address. 打开一个给定socket地址的serversocket * @param localAddress the associated local address * @return the opened server socket * @throws Exception any exception thrown by the underlying systems calls */ protected abstract H open(SocketAddress localAddress) throws Exception; /** * Get the local address associated with a given server socket 获取给定serversocket的本地地址 * @param handle the server socket * @return the local {@link SocketAddress} associated with this handle * @throws Exception any exception thrown by the underlying systems calls */ protected abstract SocketAddress localAddress(H handle) throws Exception; /** * Accept a client connection for a server socket and return a new {@link IoSession} * associated with the given {@link IoProcessor} serversocket接收一个客户端连接请求,返回一个关联指定IO处理器的会话。 * @param processor the {@link IoProcessor} to associate with the {@link IoSession} * @param handle the server handle * @return the created {@link IoSession} * @throws Exception any exception thrown by the underlying systems calls */ protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception; /** * Close a server socket. 关闭一个serversocket * @param handle the server socket * @throws Exception any exception thrown by the underlying systems calls */ protected abstract void close(H handle) throws Exception;
再来绑定socket地址:
/** * {@inheritDoc} */ @Override protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. //创建绑定操作结果。当选择器处理注册时,将会通知操作结果。 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); // adds the Registration request to the queue for the Workers // to handle 注册请求到注册队列,以便工作线程处理 registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. //创建监听器实例,本地执行器将会执行实例 startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. //由于我们刚启动监听器,不得不unblock选择操作,处理刚刚添加到注册队列的绑定请求 try { lock.acquire(); wakeup(); } finally { lock.release(); } // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set<SocketAddress> newLocalAddresses = new HashSet<>(); for (H handle : boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; }
地址绑定有一点要关注:
// creates the Acceptor instance and has the local // executor kick it off. //创建监听器实例,本地执行器将会执行实例 startupAcceptor();
/** * This method is called by the doBind() and doUnbind() * methods. If the acceptor is null, the acceptor object will * be created and kicked off by the executor. If the acceptor * object is null, probably already created and this class * is now working, then nothing will happen and the method * will just return. */ private void startupAcceptor() throws InterruptedException { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ? if (!selectable) { //如果acceptor没有准备好,则清空注册队列和取消队列 registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started, //获取当前正在运行的acceptor,没有这创建一个,有执行器执行。 Acceptor acceptor = acceptorRef.get(); if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }
我们再来看监听器Acceptor的定义:
/** * This class is called by the startupAcceptor() method and is * placed into a NamePreservingRunnable class. * It's a thread accepting incoming connections from clients. * The loop is stopped when all the bound handlers are unbound. Acceptor监听器,在startupAcceptor方法,创建被包装成NamePreservingRunnable线程 执行。Acceptor是一个线程,接收来自客户端的连接。当所有绑定的Handler解绑时,循环停止。 */ private class Acceptor implements Runnable { /** * {@inheritDoc} */ @Override public void run() { assert acceptorRef.get() == this; int nHandles = 0; // Release the lock lock.release(); //当监听器已经准备好 while (selectable) { try { // Process the bound sockets to this acceptor. // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on. We do that before the select because // the registerQueue containing the new handler is // already updated at this point. //根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle, //注册接收事件OP_ACCEPT到选择器 nHandles += registerHandles(); // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up //探测是否存在选择key就绪待处理,及一个新的连接请求发生,或者 //选择操作被显示唤醒,执行选择操作 int selected = select(); // Now, if the number of registred handles is 0, we can // quit the loop: we don't have any socket listening // for incoming connection. if (nHandles == 0) { //如果绑定地址为空,则置空acceptorRef acceptorRef.set(null); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { assert acceptorRef.get() != this; break; } if (!acceptorRef.compareAndSet(null, this)) { assert acceptorRef.get() != this; break; } assert acceptorRef.get() == this; } if (selected > 0) { // We have some connection request, let's process // them here. //有接收连接事件发生,则处理连接请求 processHandles(selectedHandles()); } // check to see if any cancellation request has been made. //unbind解绑队列的解绑地址请求 nHandles -= unregisterHandles(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. if (selectable && isDisposing()) { //如果acceptor正在关闭,则关闭关联processor selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } } /** * This method will process new sessions for the Worker class. All * keys that have had their status updates as per the Selector.selectedKeys() * method will be processed here. Only keys that are ready to accept * connections are handled here. * <p/> * Session objects are created by making new instances of SocketSessionImpl * and passing the session object to the SocketIoProcessor class. */ @SuppressWarnings("unchecked") private void processHandles(Iterator<H> handles) throws Exception { //遍历接收事件就绪的ServerSocketChannel while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session //ServerSocketChannel创建一个关联processor的会话 S session = accept(processor, handle); if (session == null) { continue; } //初始化会话 initSession(session, null, null); // add the session to the SocketIoProcessor //添加会话到会话关联io处理器 session.getProcessor().add(session); } } /** * Sets up the socket communications. Sets items such as: 绑定socket地址,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器 * <p/> * Blocking * Reuse address * Receive buffer size * Bind to listen port * Registers OP_ACCEPT for selector */ private int registerHandles() { for (;;) { // The register queue contains the list of services to manage // in this acceptor. //从socket地址绑定请求队列,poll一个地址绑定请求 AcceptorOperationFuture future = registerQueue.poll(); if (future == null) { return 0; } // We create a temporary map to store the bound handles, // as we may have to remove them all if there is an exception // during the sockets opening. Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>(); //获取需要绑定的地址 List<SocketAddress> localAddresses = future.getLocalAddresses(); try { // Process all the addresses for (SocketAddress a : localAddresses) { H handle = open(a);//根据Socket地址,打开一个ServerSocketChannle newHandles.put(localAddress(handle), handle); } // Everything went ok, we can now update the map storing // all the bound sockets. //将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合 boundHandles.putAll(newHandles); // and notify. future.setDone(); return newHandles.size(); } catch (Exception e) { // We store the exception in the future future.setException(e); } finally { // Roll back if failed to bind all addresses. if (future.getException() != null) { //如果绑定地址有异常,则关闭打开的ServerSocketChannle for (H handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } // Wake up the selector to be sure we will process the newly bound handle // and not block forever in the select() //唤醒选择器处理新创建的ServerSocketChannle,不在阻塞在选择操作 wakeup(); } } } } /** * This method just checks to see if anything has been placed into the * cancellation queue. The only thing that should be in the cancelQueue * is CancellationRequest objects and the only place this happens is in * the doUnbind() method. unregisterHandles方法,检查是否有地址解绑请求。如果有解绑请求CancellationRequest, 解绑请求在doUnbind方法中产生 */ private int unregisterHandles() { int cancelledHandles = 0; for (;;) { //从解绑队列中poll解绑请求 AcceptorOperationFuture future = cancelQueue.poll(); if (future == null) { break; } // close the channels //获取ServerSocketChannle绑定的地址 for (SocketAddress a : future.getLocalAddresses()) { //从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址 H handle = boundHandles.remove(a); if (handle == null) { continue; } try { //关闭ServerSocketChannle close(handle); //唤醒,触发ServerSocketChannle线程death wakeup(); // wake up again to trigger thread death } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { cancelledHandles++; } } //解绑成功 future.setDone(); } return cancelledHandles; } }
从Acceptor的源码可以看出,如果监听器AbstractPollingIoAcceptor已经初始化,首先
根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。
回到地址绑定方法:
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. //创建绑定操作结果。当选择器处理注册时,将会通知操作结果。 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); // adds the Registration request to the queue for the Workers // to handle 注册请求到注册队列,以便工作线程处理 registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. //创建监听器实例,本地执行器将会执行实例 startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. //由于我们刚启动监听器,不得不unblock选择操作,处理刚刚添加到注册队列的绑定请求 try { lock.acquire(); wakeup(); } finally { lock.release(); } // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set<SocketAddress> newLocalAddresses = new HashSet<>(); for (H handle : boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; }
/** * This method is called by the doBind() and doUnbind() * methods. If the acceptor is null, the acceptor object will * be created and kicked off by the executor. If the acceptor * object is null, probably already created and this class * is now working, then nothing will happen and the method * will just return. */ private void startupAcceptor() throws InterruptedException { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ? if (!selectable) { //如果acceptor没有准备好,则清空注册队列和取消队列 registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started, //获取当前正在运行的acceptor,没有这创建一个,有执行器执行。 Acceptor acceptor = acceptorRef.get(); if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }
从上面来看,地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,
创建监听器Acceptor实例并执行。
再来看地址解绑等操作:
/** * {@inheritDoc} */ @Override protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { //根据解绑地址创建AcceptorOperationFuture,添加到解绑队列,启动Acceptor线程,完成实际解绑工作。 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses); cancelQueue.add(future); startupAcceptor(); wakeup(); future.awaitUninterruptibly(); if (future.getException() != null) { throw future.getException(); } } /** * {@inheritDoc} */ @Override protected void dispose0() throws Exception { unbind();//解绑地址 startupAcceptor();//启动Acceptor线程,完成实际清理工作。 wakeup(); } /** * {@inheritDoc} 默认不支持根据远端地址和本地地址创建会话 */ @Override public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { throw new UnsupportedOperationException(); } /** * @return the backLog */ public int getBacklog() { return backlog; } /** * Sets the Backlog parameter * * @param backlog * the backlog variable */ public void setBacklog(int backlog) { synchronized (bindLock) { if (isActive()) { throw new IllegalStateException("backlog can't be set while the acceptor is bound."); } this.backlog = backlog; } } /** * @return the flag that sets the reuseAddress information */ public boolean isReuseAddress() { return reuseAddress; } /** * Set the Reuse Address flag * * @param reuseAddress * The flag to set */ public void setReuseAddress(boolean reuseAddress) { synchronized (bindLock) { if (isActive()) { throw new IllegalStateException("backlog can't be set while the acceptor is bound."); } this.reuseAddress = reuseAddress; } } /** * {@inheritDoc} */ @Override public SocketSessionConfig getSessionConfig() { return (SocketSessionConfig)sessionConfig; }
总结:
AbstractPollingIoAcceptor主要变量为Io处理器processor,地址绑定请求队列registerQueue,地址解绑请求队列cancelQueue,监听器绑定的socket地址,与ServerSocketChannel映射关系boundHandles-Map,监听工作线程Acceptor引用acceptorRef。
构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,创建监听器Acceptor实例并执行。Acceptor主要功能为,地址绑定,监听连接请求,解绑地址,实际工作逻辑为:如果监听器AbstractPollingIoAcceptor已经初始化,首先根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。Acceptor和AbstractPollingIoAcceptor的关系,与AbstractPollingIoProcessor和Processor的关系很像。地址解绑过程,首先根据解绑地址创建AcceptorOperationFuture,添加到解绑队列,启动Acceptor线程,完成实际解绑工作。
AbstractPollingIoAcceptor所有的工作(地址绑定,接收连接,创建会话,添加会话到IO处理器,解绑地址,释放监听器资源)都是在Acceptor线程里完成。
附:
//SimpleIoProcessorPool,简单的IO处理器线程池,这一步,可以做个了解。
简单的IO处理器线程池SimpleIoProcessorPool,将一个会话的相关事件在多个Io处理器执行。当前的transport内部实现用SimpleIoProcessorPool,在多处理器环境下,具有良好的性能,因此不需要直接使用SimpleIoProcessorPool,除非在同个虚拟机中运行多个IoService。
Io处理器线程池,主要是创建size个IO处理器线程处理会话和过滤器相关事件。如果在同一虚拟机中运行多个Io服务,你需要共享处理器线程池。为了达到这个效果,你需要构造
SimpleIoProcessorPool实例,在创建IO服务时,作为参数传入。会话与IO处理器线程池,主要通过会话的处理器属性关联,获取会话关联的处理器,如果会话属性没有存储关联处理器,则从处理器线程池中获取一个,添加会话属性中。SimpleIoProcessorPool的其他方法发送写请求,刷新会话,添加移除会话等操作都是通过会话关联的处理器线程池中的处理器。简单的说,SimpleIoProcessorPool使用与再同一个虚拟机下启动多个IO服务的情况,简单处理器线程池,就是一个处理器集合,添加会话时,从会话属性中,获会话关联的处理器,如果会话属性没有存储关联处理器,则从处理器线程池中获取一个,添加会话属性中。
/** * An {@link IoProcessor} pool that distributes {@link IoSession}s into one or more * {@link IoProcessor}s. Most current transport implementations use this pool internally * to perform better in a multi-core environment, and therefore, you won't need to * use this pool directly unless you are running multiple {@link IoService}s in the * same JVM. 简单的IO处理器线程池SimpleIoProcessorPool,将一个会话的相关事件在多个Io处理器执行。 当前的transport内部实现用SimpleIoProcessorPool,在多处理器环境下,具有良好的性能, 因此不需要直接使用SimpleIoProcessorPool,除非在同个虚拟机中运行多个IoService。 * <p> * If you are running multiple {@link IoService}s, you could want to share the pool * among all services. To do so, you can create a new {@link SimpleIoProcessorPool} * instance by yourself and provide the pool as a constructor parameter when you * create the services. 如果在同一虚拟机中运行多个Io服务,你需要共享处理器线程池。为了达到这个效果,你需要构造 SimpleIoProcessorPool实例,在创建IO服务时,作为参数传入。 * <p> * This pool uses Java reflection API to create multiple {@link IoProcessor} instances. * It tries to instantiate the processor in the following order: Io处理器线程池默认通过反射创建io处理器实例,有一下三中形式 * [list=1] * [*]A public constructor with one {@link ExecutorService} parameter. * [*]A public constructor with one {@link Executor} parameter. * [*]A public default constructor * [/list] * The following is an example for the NIO socket transport: * <pre><code> 下面是创建TCP服务端的一个实例 * // Create a shared pool. * SimpleIoProcessorPool<NioSession> pool = * new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16); * * // Create two services that share the same pool. * SocketAcceptor acceptor = new NioSocketAcceptor(pool); * SocketConnector connector = new NioSocketConnector(pool); * * ... * * // Release related resources. * connector.dispose(); * acceptor.dispose(); * pool.dispose(); * </code></pre> * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * * @param <S> the type of the {@link IoSession} to be managed by the specified * {@link IoProcessor}. */ public class SimpleIoProcessorPool<S extends AbstractIoSession> implements IoProcessor<S> { /** A logger for this class */ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleIoProcessorPool.class); /** The default pool size, when no size is provided.默认处理器实例数为运行时环境可以利用处理器数量+1 */ private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; /** A key used to store the processor pool in the session's Attributes */ private static final AttributeKey PROCESSOR = new AttributeKey(SimpleIoProcessorPool.class, "processor"); /** The pool table */ private final IoProcessor<S>[] pool;//io处理器集会话的相关事件。 /** The contained which is passed to the IoProcessor when they are created */ private final Executor executor;//io异步事件执行器 /** A flag set to true if we had to create an executor */ private final boolean createdExecutor; /** A lock to protect the disposal against concurrent calls */ private final Object disposalLock = new Object(); /** A flg set to true if the IoProcessor in the pool are being disposed */ private volatile boolean disposing; /** A flag set to true if all the IoProcessor contained in the pool have been disposed */ private volatile boolean disposed; /** * Creates a new instance of SimpleIoProcessorPool with a default * size of NbCPUs +1. * * @param processorType The type of IoProcessor to use */ public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) { this(processorType, null, DEFAULT_SIZE, null); } /** * Creates a new instance of SimpleIoProcessorPool with a defined * number of IoProcessors in the pool * * @param processorType The type of IoProcessor to use * @param size The number of IoProcessor in the pool */ public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size) { this(processorType, null, size, null); } /** * Creates a new instance of SimpleIoProcessorPool with a defined * number of IoProcessors in the pool * * @param processorType The type of IoProcessor to use * @param size The number of IoProcessor in the pool * @param selectorProvider The SelectorProvider to use */ public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, int size, SelectorProvider selectorProvider) { this(processorType, null, size, selectorProvider); } /** * Creates a new instance of SimpleIoProcessorPool with an executor * * @param processorType The type of IoProcessor to use * @param executor The {@link Executor} */ public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor) { this(processorType, executor, DEFAULT_SIZE, null); } /** * Creates a new instance of SimpleIoProcessorPool with an executor * * @param processorType The type of IoProcessor to use * @param executor The {@link Executor} * @param size The number of IoProcessor in the pool * @param selectorProvider The SelectorProvider to used */ @SuppressWarnings("unchecked") public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, SelectorProvider selectorProvider) { if (processorType == null) { throw new IllegalArgumentException("processorType"); } if (size <= 0) { throw new IllegalArgumentException("size: " + size + " (expected: positive integer)"); } // Create the executor if none is provided createdExecutor = executor == null; if (createdExecutor) { this.executor = Executors.newCachedThreadPool(); // Set a default reject handler ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } else { this.executor = executor; } pool = new IoProcessor[size];//初始化IO处理器集 boolean success = false; Constructor<? extends IoProcessor<S>> processorConstructor = null; boolean usesExecutorArg = true; try { // We create at least one processor try { try { //创建处理器 processorConstructor = processorType.getConstructor(ExecutorService.class); pool[0] = processorConstructor.newInstance(this.executor); } catch (NoSuchMethodException e1) { // To the next step... try { //如果处理器没有相关构造方法,重新构造Io处理器 if(selectorProvider==null) { processorConstructor = processorType.getConstructor(Executor.class); pool[0] = processorConstructor.newInstance(this.executor); } else { processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class); pool[0] = processorConstructor.newInstance(this.executor,selectorProvider); } } catch (NoSuchMethodException e2) { // To the next step... try { processorConstructor = processorType.getConstructor(); usesExecutorArg = false; pool[0] = processorConstructor.newInstance(); } catch (NoSuchMethodException e3) { // To the next step... } } } } catch (RuntimeException re) { LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage()); throw re; } catch (Exception e) { String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage(); LOGGER.error(msg, e); throw new RuntimeIoException(msg, e); } if (processorConstructor == null) { // Raise an exception if no proper constructor is found. String msg = String.valueOf(processorType) + " must have a public constructor with one " + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one " + Executor.class.getSimpleName() + " parameter or a public default constructor."; LOGGER.error(msg); throw new IllegalArgumentException(msg); } // Constructor found now use it for all subsequent instantiations for (int i = 1; i < pool.length; i++) { try { if (usesExecutorArg) { if(selectorProvider==null) { pool[i] = processorConstructor.newInstance(this.executor); } else { pool[i] = processorConstructor.newInstance(this.executor, selectorProvider); } } else { pool[i] = processorConstructor.newInstance(); } } catch (Exception e) { // Won't happen because it has been done previously } } success = true; } finally { if (!success) { dispose(); } } } //从上来看,构造Io处理器线程池,主要是创建size个IO处理器线程处理会话和过滤器相关事件 /** * {@inheritDoc} */ @Override public final void add(S session) { getProcessor(session).add(session); } /** * Find the processor associated to a session. If it hasen't be stored into * the session's attributes, pick a new processor and stores it. 获取会话关联的处理器,如果会话属性没有存储关联处理器,则从处理器线程池中获取一个,添加 会话属性中。 */ @SuppressWarnings("unchecked") private IoProcessor<S> getProcessor(S session) { IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; } /** * {@inheritDoc} */ @Override public final void flush(S session) { getProcessor(session).flush(session); } /** * {@inheritDoc} */ @Override public final void write(S session, WriteRequest writeRequest) { getProcessor(session).write(session, writeRequest); } /** * {@inheritDoc} */ @Override public final void remove(S session) { getProcessor(session).remove(session); } /** * {@inheritDoc} */ @Override public final void updateTrafficControl(S session) { getProcessor(session).updateTrafficControl(session); } /** * {@inheritDoc} */ @Override public boolean isDisposed() { return disposed; } /** * {@inheritDoc} */ @Override public boolean isDisposing() { return disposing; } /** * {@inheritDoc} */ @Override public final void dispose() { if (disposed) { return; } synchronized (disposalLock) { if (!disposing) { disposing = true; //遍历处理器集,释放处理器资源 for (IoProcessor<S> ioProcessor : pool) { if (ioProcessor == null) { // Special case if the pool has not been initialized properly continue; } if (ioProcessor.isDisposing()) { continue; } try { ioProcessor.dispose(); } catch (Exception e) { LOGGER.warn("Failed to dispose the {} IoProcessor.", ioProcessor.getClass().getSimpleName(), e); } } if (createdExecutor) { ((ExecutorService) executor).shutdown(); } } Arrays.fill(pool, null); disposed = true; } } }
发表评论
-
Mina 报文连接器(NioDatagramConnector)
2017-06-14 08:46 1417Mina 抽象Polling连接器(A ... -
Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)
2017-06-13 16:01 1541Mina 报文监听器NioDatagramAcceptor一( ... -
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)
2017-06-13 09:51 2575Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina 报文通信简单示例
2017-06-12 09:01 2585MINA TCP简单通信实例:http://donald-dr ... -
Mina socket连接器(NioSocketConnector)
2017-06-12 08:37 4772Mina 抽象Polling连接器(AbstractPolli ... -
Mina 抽象Polling连接器(AbstractPollingIoConnector)
2017-06-11 21:29 1008Mina 连接器接口定义及抽象实现(IoConnector ) ... -
Mina 连接器接口定义及抽象实现(IoConnector )
2017-06-11 13:46 1831Mina IoService接口定义及抽象实现:http:// ... -
Mina socket监听器(NioSocketAcceptor)
2017-06-09 08:44 3418Mina IoService接口定义及抽象实现:http:// ... -
Mina Io监听器接口定义及抽象实现
2017-06-07 13:02 1349Mina IoService接口定义及抽象实现:http:// ... -
Mina IoService接口定义及抽象实现
2017-06-06 23:44 1195Mina IoHandler接口定义:http://donal ... -
Mina Nio会话(Socket,DataGram)
2017-06-06 12:53 1209Mina Socket会话配置:http://donald-d ... -
Mina 抽象Io会话
2017-06-05 22:45 1014Mina Io会话接口定义:http://donald-dra ... -
Mina Io会话接口定义
2017-06-04 23:15 1166Mina Nio处理器:http://donald-drape ... -
Mina Nio处理器
2017-06-04 22:19 742Mina Io处理器抽象实现:http://donald-dr ... -
Mina Io处理器抽象实现
2017-06-03 23:52 1148Mina 过滤链抽象实现:http://donald-drap ... -
Mina IoHandler接口定义
2017-06-01 21:30 1734Mina 过滤链抽象实现:http://donald-drap ... -
MINA 多路复用协议编解码器工厂二(多路复用协议解码器)
2017-06-01 12:52 2273MINA 多路复用协议编解码器工厂一(多路复用协议编码器): ... -
MINA 多路复用协议编解码器工厂一(多路复用协议编码器)
2017-05-31 22:22 1868MINA 多路分离解码器实例:http://donald-dr ... -
Mina 累计协议解码器
2017-05-31 00:09 1229MINA 编解码器实例:http://donald-drape ... -
Mina 协议编解码过滤器三(会话write与消息接收过滤)
2017-05-28 07:22 1750Mina 协议编解码过滤器一(协议编解码工厂、协议编码器): ...
相关推荐
6. **Transport Layer**:Mina支持多种传输层实现,如TCP、UDP等,这些都抽象为IoAcceptor和IoConnector接口,方便开发者使用。 深入研究源码,你可能会关注以下方面: - **mina-core**模块:这是Mina的核心库,...
首先,我们需要启动`mina_server`,这通常是通过运行包含服务器端逻辑的主类实现的,服务器端会监听指定的端口,等待客户端连接。服务器启动后,它会使用预定义的编解码器来处理接收到的数据。 接着,我们启动`mina...
在Mina与Socket通信的实现中,服务端通常使用Mina来创建一个Acceptor,监听特定端口,等待客户端的连接请求。一旦有连接建立,Mina会自动触发相应的事件处理器,开发者可以在其中处理数据读写。以下是一个基本的...
Mina提供了一种事件驱动的模型,通过IoSession接口来管理连接,包括读写数据、添加监听器、关闭连接等操作。IoSession是连接状态的容器,包含了会话中的所有信息,如远程地址、本地地址、缓冲区大小、已发送和接收的...
Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...
5. **事件处理**:Mina通过监听器接口(如IoAdapter、IoHandler等)来处理网络事件,如连接建立、数据接收、连接关闭等。你可以实现这些接口并覆盖相关方法来响应特定事件。 6. **服务启动和停止**:在Mina中,服务...
2. **事件驱动**:MINA采用事件驱动的设计模式,通过监听器接口,开发者可以方便地处理各种网络事件,如连接建立、数据读写、连接关闭等。 3. **协议无关性**:MINA不局限于特定的网络协议,而是提供了一套抽象层,...
5. **事件监听**:在IoSession上设置事件监听器,以便在特定事件发生时执行相应操作,例如在按钮点击事件中触发消息发送。 6. **发送数据**:通过IoSession的write()方法发送数据。在这个例子中,可能是点击按钮后...
学习MINA API,开发者可以掌握异步网络编程的核心技巧,理解MINA如何通过事件驱动和过滤器链机制来提高网络应用的效率。同时,MINA支持多种协议,如TCP、UDP、SSL/TLS等,使得它在各种网络通信场景下都具有广泛的...
MINA 提供了一种抽象层,使得开发者可以专注于应用逻辑,而不是底层网络编程的复杂性。在这个“mina 实现简单通讯”的项目中,我们看到了一个基于MINA的基本通信实现,涵盖了服务端和客户端的交互。 首先,MINA的...
3. **事件驱动**:MINA采用事件驱动的设计,通过事件监听器机制来处理网络事件,如连接建立、数据接收、断开连接等,使得代码更加简洁和易于维护。 4. **过滤器链**:MINA的过滤器链机制允许开发者插入自定义的过滤...
当启动应用时,Mina服务器将监听指定的端口,等待客户端的连接,并通过自定义解码器解析接收到的消息。这样,你就可以在SpringBoot的环境中轻松构建网络通信服务了。 在实际开发中,你可能还需要考虑异常处理、心跳...
此外,它可能还包含了支持不同协议(如TCP,UDP)的处理器,以及各种I/O事件的监听器。 在压缩包子文件的文件名称列表中,只有一个"mina",这可能是压缩文件内的目录名,或者是压缩文件本身的名称。如果它是目录名...
2. **事件驱动**:Mina通过事件驱动模型处理网络通信,当有新的连接、数据读取或写入时,会触发相应的事件,开发者可以注册监听器来响应这些事件。 3. **协议独立**:Mina提供了抽象层,使得开发者可以专注于业务...
MINA 提供了一种抽象层,允许开发者远离底层的Socket编程,专注于业务逻辑,从而简化了网络编程。在这个“mina源代码学习”资料中,你将有机会深入理解MINA的核心原理和实现机制。 MINA的核心特性包括: 1. **非...
2. **事件驱动**:MINA基于事件驱动的设计,通过监听器接口实现对网络事件的响应,如连接建立、数据读写、连接关闭等,使得代码更简洁,易于维护。 3. **协议无关**:MINA提供了一层抽象,使得开发者可以方便地支持...
当这些事件发生时,MINA会触发预先注册的处理器或监听器,开发者可以在这些处理器中编写相应逻辑。 2. **非阻塞I/O**:非阻塞I/O允许一个线程处理多个连接,因为它不会因为等待数据而被阻塞。这提高了服务器的并发...
2. **事件驱动**:MINA基于事件驱动的设计,通过监听网络事件(如连接建立、数据到达等)来触发相应的处理逻辑,简化了复杂网络应用的编程。 3. **异步通信**:MINA的异步通信模式意味着发送请求后无需等待响应,...
Mina提供了高度抽象的API,使得开发者可以专注于业务逻辑,而无需关注底层I/O细节。 2. **服务端与客户端** 在这个demo中,服务端和客户端都是通过控制台交互的。服务端负责监听特定端口,接收客户端的连接请求,...
这个监听器应具备处理两种不同数据类型的能力,可能需要使用Mina的Session对象和Filter链。 6. **协议设计**:设计一个自定义协议来区分文件和文本数据。这可能包括特定的起始和结束标记,或者预定义的命令结构。 ...