- 浏览: 984997 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器):http://donald-draper.iteye.com/blog/2379152
引言:
前面一篇文章我们看了报文监听器NioDatagramAcceptor的内部变量,构造和IO处理器相关的功能,先来回顾一下:
报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
现在我们来看NioDatagramAcceptor的IoAcceptor和Io服务相关功能的实现:先贴出报文监听器NioDatagramAcceptor的内部变量声明,以便理解后面的内容,
回到上一篇文章启动监听器线程片段startupAcceptor
下面来看一下Acceptor的定义:
监听器线程有一下几点要关注:
1.
2.
3.
4.
5.
6.
我们分别来以上几点:
1.
来看打开通道方法:
从上面来看,处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,
通知service监听,服务已开启,触发fireServiceActivated事件;
再来看第二点:
2.
这一点有两点要关注
2.a
来看报文读处理的数据接收和会话创建
2.a.1
2.a.2
来看创建会话这一点
//根据Io处理器,报文通道及远端socket地址创建会话
默认会话管理器sessionRecycler,见附;
2.b
从上面可以看出,处理报文通道就绪续事件,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列;
再来看发送刷新队列的会话写请求:
3.
从上面可以看出处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话
4.
从上可以看出处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;
5.
6.
来看剩余的方法操作,很简单,不详解:
在下面这篇文章中,我们讲过报文过滤链,可以集合本文,在回到看看下面这篇文章
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
我们贴出上面这篇文章的报文过滤链的定义:
报文过滤链发送会话写请求,即添加会话写请求队列,待报文监听器NioDatagramAcceptor(监听器线程Acceptor)调度刷新(通过会话关联的报文通道发送消息字节序列)。
总结:
监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件; 如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。
附:
来看一下默认会话管理器ExpiringSessionRecycler:
//过期Map-ExpiringMap
//对象过期监听器ExpirationListener
引言:
前面一篇文章我们看了报文监听器NioDatagramAcceptor的内部变量,构造和IO处理器相关的功能,先来回顾一下:
报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
现在我们来看NioDatagramAcceptor的IoAcceptor和Io服务相关功能的实现:先贴出报文监听器NioDatagramAcceptor的内部变量声明,以便理解后面的内容,
/** * {@link IoAcceptor} for datagram transport (UDP/IP). * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * @org.apache.xbean.XBean */ public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> { /** * A session recycler that is used to retrieve an existing session, unless it's too old. 默认过期会话回收器 **/ private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler(); /** * A timeout used for the select, as we need to get out to deal with idle * sessions 选择超时时间 */ private static final long SELECT_TIMEOUT = 1000L; /** A lock used to protect the selector to be waked up before it's created */ private final Semaphore lock = new Semaphore(1); /** A queue used to store the list of pending Binds 地址绑定请求*/ private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>(); //地址解绑请求队列 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>(); //刷新会话队列,IO处理器刷新操作会用到,暂存刷新操作的会话 private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>(); // socket地址与报文通道映射Map,绑定操作使socket地址与报文通道关联起来 private final Map<SocketAddress, DatagramChannel> boundHandles = Collections .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>()); //会话管理器,监控连接Service的会话,如果会话过期,关闭过期的会话 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER; private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); private volatile boolean selectable; /** The thread responsible of accepting incoming requests */ private Acceptor acceptor;//监听器线程 private long lastIdleCheckTime;//上次空闲检查时间 /** The Selector used by this acceptor 选择器*/ private volatile Selector selector; }
回到上一篇文章启动监听器线程片段startupAcceptor
/** * Starts the inner Acceptor thread. */ private void startupAcceptor() throws InterruptedException { if (!selectable) { //如果选择器初始化失败,则清空注册队列,取消队列及刷新会话队列 registerQueue.clear(); cancelQueue.clear(); flushingSessions.clear(); } lock.acquire(); if (acceptor == null) { //创建Acceptor线程实例,并执行 acceptor = new Acceptor(); executeWorker(acceptor); } else { lock.release(); } }
下面来看一下Acceptor的定义:
/** * This private class is used to accept incoming connection from * clients. It's an infinite loop, which can be stopped when all * the registered handles have been removed (unbound). 接收客户端的连接。主操作是一个无限循环,当所有绑定的地址的报文通道解绑时, 循环退出 */ private class Acceptor implements Runnable { @Override public void run() { int nHandles = 0; lastIdleCheckTime = System.currentTimeMillis(); // Release the lock lock.release(); while (selectable) { try { //超时选择 int selected = select(SELECT_TIMEOUT); //处理地址绑定请求 nHandles += registerHandles(); if (nHandles == 0) { try { //如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程 lock.acquire(); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { acceptor = null; break; } } finally { lock.release(); } } if (selected > 0) { //处理读写操作时间就绪的会话 processReadySessions(selectedHandles()); } long currentTime = System.currentTimeMillis(); //发送刷新队列中的写请求 flushSessions(currentTime); //处理报文通道地址解绑请求 nHandles -= unregisterHandles(); //通知会话空闲 notifyIdleSessions(currentTime); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { } } } //如何Io处理器正在关闭,则销毁报文监听器 if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } } } }
监听器线程有一下几点要关注:
1.
//处理地址绑定请求 nHandles += registerHandles();
2.
if (selected > 0) { //处理读写操作时间就绪的会话 processReadySessions(selectedHandles()); }
3.
//发送刷新队列中的写请求 flushSessions(currentTime);
4.
//处理报文通道地址解绑请求 nHandles -= unregisterHandles();
5.
//通知会话空闲 notifyIdleSessions(currentTime);
6.
//如何Io处理器正在关闭,则销毁报文监听器 if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } }
我们分别来以上几点:
1.
//处理地址绑定请求 nHandles += registerHandles();
private int registerHandles() { for (;;) { //从注册队列,poll地址绑定请求 AcceptorOperationFuture req = registerQueue.poll(); if (req == null) { break; } Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>(); List<SocketAddress> localAddresses = req.getLocalAddresses(); try { //遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道 for (SocketAddress socketAddress : localAddresses) DatagramChannel handle = open(socketAddress); //添加socket地址与报文通道映射到集合newHandles newHandles.put(localAddress(handle), handle); } 添加socket地址与报文通道映射到boundHandles boundHandles.putAll(newHandles); //通知service监听,服务已开启,及触发fireServiceActivated事件 getListeners().fireServiceActivated(); //地址绑定结束 req.setDone(); return newHandles.size(); } catch (Exception e) { req.setException(e); } finally { // Roll back if failed to bind all addresses. //如果异常,则关闭报文通道 if (req.getException() != null) { for (DatagramChannel handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } wakeup(); } } } return 0; }
来看打开通道方法:
protected DatagramChannel open(SocketAddress localAddress) throws Exception { //打开一个报文通道 final DatagramChannel ch = DatagramChannel.open(); boolean success = false; try { //配置通道会话及阻塞模式 new NioDatagramSessionConfig(ch).setAll(getSessionConfig()); ch.configureBlocking(false); try { //绑定地址 ch.socket().bind(localAddress); } catch (IOException ioe) { // Add some info regarding the address we try to bind to the // message String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " + ioe.getMessage(); Exception e = new IOException(newMessage); e.initCause(ioe.getCause()); // And close the channel ch.close(); throw e; } //注册报文通道读操作事件OP_READ到选择器selector ch.register(selector, SelectionKey.OP_READ); success = true; } finally { if (!success) { close(ch); } } return ch; }
从上面来看,处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,
通知service监听,服务已开启,触发fireServiceActivated事件;
再来看第二点:
2.
if (selected > 0) { //处理读写操作时间就绪的会话 processReadySessions(selectedHandles()); }
private void processReadySessions(Set<SelectionKey> handles) { Iterator<SelectionKey> iterator = handles.iterator(); //遍历读写操作事件就绪的报文通道 while (iterator.hasNext()) { //获取选择key,及报文通道 SelectionKey key = iterator.next(); DatagramChannel handle = (DatagramChannel) key.channel(); iterator.remove(); try { //执行读操作 if (key.isValid() && key.isReadable()) { readHandle(handle); } //执行写操作 if (key.isValid() && key.isWritable()) { for (IoSession session : getManagedSessions().values()) { scheduleFlush((NioSession) session); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }
这一点有两点要关注
2.a
//执行读操作 if (key.isValid() && key.isReadable()) { readHandle(handle); }
private void readHandle(DatagramChannel handle) throws Exception { IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize()); //接收数据 SocketAddress remoteAddress = receive(handle, readBuf); if (remoteAddress != null) { //创建会话 IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle)); readBuf.flip(); //触发会话过滤链的消息接收事件fireMessageReceived session.getFilterChain().fireMessageReceived(readBuf); } }
来看报文读处理的数据接收和会话创建
2.a.1
//接收数据 protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception { return handle.receive(buffer.buf()); }
2.a.2
//创建会话 private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { //获取远端socket地址关联的报文通道 DatagramChannel handle = boundHandles.get(localAddress); if (handle == null) { throw new IllegalArgumentException("Unknown local address: " + localAddress); } IoSession session; synchronized (sessionRecycler) { //从会话管理器,获取远端socket地址会话,以便重用 session = sessionRecycler.recycle(remoteAddress); if (session != null) { return session; } // If a new session needs to be created. //创建会话 NioSession newSession = newSession(this, handle, remoteAddress); //将会话添加会话管理器,监控会话 getSessionRecycler().put(newSession); session = newSession; } //初始化会话 initSession(session, null, null); try { //构建会话过滤链 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); //通知Service监听器发生会话创建事件fireSessionCreated getListeners().fireSessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } return session; }
来看创建会话这一点
//创建会话 NioSession newSession = newSession(this, handle, remoteAddress);
//根据Io处理器,报文通道及远端socket地址创建会话
protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle, SocketAddress remoteAddress) { //获取报文通道注册到选择器的选择key SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return null; } //创建报文会话 NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress); //设置会话选择key newSession.setSelectionKey(key); return newSession; }
默认会话管理器sessionRecycler,见附;
2.b
//执行写操作 if (key.isValid() && key.isWritable()) { //调度Service管理的会话 for (IoSession session : getManagedSessions().values()) { scheduleFlush((NioSession) session); } }
从上面可以看出,处理报文通道就绪续事件,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列;
再来看发送刷新队列的会话写请求:
3.
//发送刷新队列中的会话写请求 flushSessions(currentTime);
private void flushSessions(long currentTime) { for (;;) { //从刷新队列获取会话 NioSession session = flushingSessions.poll(); if (session == null) { break; } // Reset the Schedule for flush flag for this session, // as we are flushing it now //设置会话为未调度 session.unscheduledForFlush(); try { //刷新会话 boolean flushedAll = flush(session, currentTime); if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { //如果刷新成功,但会话写请求队列不为空,且未调度,则重新调度会话 scheduleFlush(session); } } catch (Exception e) { session.getFilterChain().fireExceptionCaught(e); } } }
//发送会话写请求 private boolean flush(NioSession session, long currentTime) throws Exception { //获取会话写请求队列,会话最大读buffersize final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; try { for (;;) { //获取会话当前写请求 WriteRequest req = session.getCurrentWriteRequest(); if (req == null) { //从写请求队列poll一个写请求 req = writeRequestQueue.poll(session); if (req == null) { //设置会话不在关注写事件 setInterestedInWrite(session, false); break; } //设置会话当前写请求 session.setCurrentWriteRequest(req); } //获取写请求消息 IoBuffer buf = (IoBuffer) req.getMessage(); if (buf.remaining() == 0) { // Clear and fire event //置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent session.setCurrentWriteRequest(null); buf.reset(); session.getFilterChain().fireMessageSent(req); continue; } //获取写请求远端地址 SocketAddress destination = req.getDestination(); //如果写请求远端地址为null,则获取会话远端地址 if (destination == null) { destination = session.getRemoteAddress(); } //发送会话写请求字节序列 int localWrittenBytes = send(session, buf, destination); if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) { // Kernel buffer is full or wrote too much //如果数据太多或发送数据失败,设置会话关注写操作事件 setInterestedInWrite(session, true); return false; } else { //数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent setInterestedInWrite(session, false); // Clear and fire event session.setCurrentWriteRequest(null); writtenBytes += localWrittenBytes; buf.reset(); session.getFilterChain().fireMessageSent(req); } } } finally { //更新会话写字节计数器 session.increaseWrittenBytes(writtenBytes, currentTime); } return true; }
//委托会话关联的报文通道发送会话消息字节序列 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception { return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress); }
从上面可以看出处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话
4.
//处理报文通道地址解绑请求 nHandles -= unregisterHandles();
private int unregisterHandles() { int nHandles = 0; for (;;) { //从取消队列,poll地址解绑请求 AcceptorOperationFuture request = cancelQueue.poll(); if (request == null) { break; } // close the channels //遍历地址解绑请求socket地址集合 for (SocketAddress socketAddress : request.getLocalAddresses()) { //从socket与报文通道映射集boundHandles移除socket地址 DatagramChannel handle = boundHandles.remove(socketAddress); if (handle == null) { continue; } try { //关闭报文通道 close(handle); //唤醒选择操作 wakeup(); // wake up again to trigger thread death } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { nHandles++; } } //解绑成功 request.setDone(); } return nHandles; }
//关闭通道 protected void close(DatagramChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); //取消选择key if (key != null) { key.cancel(); } //关闭连接及通道 handle.disconnect(); handle.close(); }
从上可以看出处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;
5.
//通知会话空闲 notifyIdleSessions(currentTime);
private void notifyIdleSessions(long currentTime) { // process idle sessions if (currentTime - lastIdleCheckTime >= 1000) { lastIdleCheckTime = currentTime; //通知service管理的会话空闲 AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime); } }
6.
//如何Io处理器正在关闭,则销毁报文监听器 if (selectable && isDisposing()) { selectable = false; try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } }
//关闭选择器 protected void destroy() throws Exception { if (selector != null) { selector.close(); } }
来看剩余的方法操作,很简单,不详解:
/** * {@inheritDoc} 创建会话 */ @Override public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { if (isDisposing()) { throw new IllegalStateException("The Acceptor is being disposed."); } if (remoteAddress == null) { throw new IllegalArgumentException("remoteAddress"); } synchronized (bindLock) { if (!isActive()) { throw new IllegalStateException("Can't create a session from a unbound service."); } try { //创建报文会话 return newSessionWithoutLock(remoteAddress, localAddress); } catch (RuntimeException | Error e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to create a session.", e); } } } /** * {@inheritDoc} 解绑地址 */ @Override protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); //添加地址解绑请求到取消队列 cancelQueue.add(request); startupAcceptor();//启动监听器线程 wakeup();//唤醒选择器 //等待解绑成功 request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } } /** * {@inheritDoc} 关闭IO处理器相关的资源 */ @Override protected void dispose0() throws Exception { unbind();//解绑地址 startupAcceptor();//启动监听器线程 wakeup(); } //选择操作 protected int select() throws Exception { return selector.select(); } protected int select(long timeout) throws Exception { return selector.select(timeout); } //上一次选择后,存在就绪事件的选择key protected Set<SelectionKey> selectedHandles() { return selector.selectedKeys(); } @Override public InetSocketAddress getDefaultLocalAddress() { return (InetSocketAddress) super.getDefaultLocalAddress(); } @Override public InetSocketAddress getLocalAddress() { return (InetSocketAddress) super.getLocalAddress(); } /** * {@inheritDoc} */ @Override public DatagramSessionConfig getSessionConfig() { return (DatagramSessionConfig) sessionConfig; } @Override public final IoSessionRecycler getSessionRecycler() { return sessionRecycler; } @Override public TransportMetadata getTransportMetadata() { return NioDatagramSession.METADATA; } protected boolean isReadable(DatagramChannel handle) { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return false; } return key.isReadable(); } protected boolean isWritable(DatagramChannel handle) { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return false; } return key.isWritable(); } @Override public void setDefaultLocalAddress(InetSocketAddress localAddress) { setDefaultLocalAddress((SocketAddress) localAddress); } @Override public final void setSessionRecycler(IoSessionRecycler sessionRecycler) { synchronized (bindLock) { if (isActive()) { throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound."); } if (sessionRecycler == null) { sessionRecycler = DEFAULT_RECYCLER; } this.sessionRecycler = sessionRecycler; } }
在下面这篇文章中,我们讲过报文过滤链,可以集合本文,在回到看看下面这篇文章
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
我们贴出上面这篇文章的报文过滤链的定义:
class DatagramFilterChain extends AbstractIoFilterChain { DatagramFilterChain(IoSession parent) { super(parent); } //会话发送写请求,及添加会话写请求队列,待报文监听器调度刷新,即通过会话关联的报文通道 //发送消息字节序列 protected void doWrite(IoSession session, WriteRequest writeRequest) { DatagramSessionImpl s = (DatagramSessionImpl) session; //获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲 Queue writeRequestQueue = s.getWriteRequestQueue(); // SocketIoProcessor.doFlush() will reset it after write is finished // because the buffer will be passed with messageSent event. //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件, //待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置 ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage(); buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { //BaseIoSession // private final AtomicInteger scheduledWriteRequests = new AtomicInteger(); //更新调度请求计数器+1 s.increaseScheduledWriteRequests(); } else { //BaseIoSession //private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); //更新调度写字节计数器+buffer.remaining() s.increaseScheduledWriteBytes(buffer.remaining()); s.increaseScheduledWriteBytes(buffer.remaining()); } synchronized (writeRequestQueue) { //将写请求添加到session写请求队列中 writeRequestQueue.push(writeRequest); } if (session.getTrafficMask().isWritable()) { //DatagramSessionImpl //private final DatagramService managerDelegate; //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作, //这个在以后在具体详说 s.getManagerDelegate().flushSession(s); } } protected void doClose(IoSession session) { DatagramSessionImpl s = (DatagramSessionImpl) session; DatagramService manager = s.getManagerDelegate(); ////委托给session关联的managerDelegate(DatagramService)关闭会话 if (manager instanceof DatagramConnectorDelegate) { //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看 ((DatagramConnectorDelegate) manager).closeSession(s); } else { //通知DatagramAcceptorDelegate的监听器会话已关闭 ((DatagramAcceptorDelegate) manager).getListeners() .fireSessionDestroyed(session); //设置会话CloseFuture为已关闭状态 session.getCloseFuture().setClosed(); } } }
报文过滤链发送会话写请求,即添加会话写请求队列,待报文监听器NioDatagramAcceptor(监听器线程Acceptor)调度刷新(通过会话关联的报文通道发送消息字节序列)。
总结:
监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件; 如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。
附:
来看一下默认会话管理器ExpiringSessionRecycler:
/** * An {@link IoSessionRecycler} with sessions that time out on inactivity. * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * @org.apache.xbean.XBean */ public class ExpiringSessionRecycler implements IoSessionRecycler { /** A map used to store the session 存储会话*/ private ExpiringMap<SocketAddress, IoSession> sessionMap; /** A map used to keep a track of the expiration ,监控会话是否过期线程*/ private ExpiringMap<SocketAddress, IoSession>.Expirer mapExpirer; /** * Create a new ExpiringSessionRecycler instance */ public ExpiringSessionRecycler() { this(ExpiringMap.DEFAULT_TIME_TO_LIVE); } /** * Create a new ExpiringSessionRecycler instance * * @param timeToLive The delay after which the session is going to be recycled */ public ExpiringSessionRecycler(int timeToLive) { this(timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL); } /** * Create a new ExpiringSessionRecycler instance * * @param timeToLive The delay after which the session is going to be recycled * @param expirationInterval The delay after which the expiration occurs */ public ExpiringSessionRecycler(int timeToLive, int expirationInterval) { sessionMap = new ExpiringMap<>(timeToLive, expirationInterval); mapExpirer = sessionMap.getExpirer(); //添加会话过期监听器 sessionMap.addExpirationListener(new DefaultExpirationListener()); } /** * {@inheritDoc} 添加会话 */ @Override public void put(IoSession session) { //如果检查线程没启动,启动检查线程,监控会话是否过期 mapExpirer.startExpiringIfNotStarted(); SocketAddress key = session.getRemoteAddress(); if (!sessionMap.containsKey(key)) { sessionMap.put(key, session); } } /** * {@inheritDoc} 获取远端socket地址对应的会话 */ @Override public IoSession recycle(SocketAddress remoteAddress) { return sessionMap.get(remoteAddress); } /** * {@inheritDoc} 移除会话 */ @Override public void remove(IoSession session) { sessionMap.remove(session.getRemoteAddress()); } /** * Stop the thread from monitoring the map 停止过期检查线程 */ public void stopExpiring() { mapExpirer.stopExpiring(); } //配置获取对象生存时间 /** * Update the value for the time-to-live * * @param timeToLive The time-to-live (seconds) */ public void setTimeToLive(int timeToLive) { sessionMap.setTimeToLive(timeToLive); } /** * @return The session time-to-live in second */ public int getTimeToLive() { return sessionMap.getTimeToLive(); } //配置获取过期检查间隔 /** * Set the interval in which a session will live in the map before it is removed. * * @param expirationInterval The session expiration time in seconds */ public void setExpirationInterval(int expirationInterval) { sessionMap.setExpirationInterval(expirationInterval); } /** * @return The session expiration time in second */ public int getExpirationInterval() { return sessionMap.getExpirationInterval(); } //默认过期监听器,即关闭会话 private class DefaultExpirationListener implements ExpirationListener<IoSession> { @Override public void expired(IoSession expiredSession) { expiredSession.closeNow(); } } }
//过期Map-ExpiringMap
** * A map with expiration. This class contains a worker thread that will * periodically check this class in order to determine if any objects * should be removed based on the provided time-to-live value. * 过期map包含一个线程,将间歇地检查监控集合delegate中的过期对象ExpiringObject的生存时间是否 大于timeToLive,大于则从监控集合delegate中移除过期元素对象ExpiringObject。 * @param <K> The key type * @param <V> The value type * * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public class ExpiringMap<K, V> implements Map<K, V> { /** The default value, 60 seconds */ public static final int DEFAULT_TIME_TO_LIVE = 60;//对象生存时间,默认60s /** The default value, 1 second */ public static final int DEFAULT_EXPIRATION_INTERVAL = 1;//默认检查间隔1s private static volatile int expirerCount = 1; private final ConcurrentHashMap<K, ExpiringObject> delegate;//检查线程expirer,监控的Map private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners; private final Expirer expirer;//过期Map元素检查线程 /** * Creates a new instance of ExpiringMap using the default values * DEFAULT_TIME_TO_LIVE and DEFAULT_EXPIRATION_INTERVAL * */ public ExpiringMap() { this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL); } /** * Creates a new instance of ExpiringMap using the supplied * time-to-live value and the default value for DEFAULT_EXPIRATION_INTERVAL * * @param timeToLive The time-to-live value (seconds) */ public ExpiringMap(int timeToLive) { this(timeToLive, DEFAULT_EXPIRATION_INTERVAL); } /** * Creates a new instance of ExpiringMap using the supplied values and * a {@link ConcurrentHashMap} for the internal data structure. * * @param timeToLive The time-to-live value (seconds) * @param expirationInterval The time between checks to see if a value should be removed (seconds) */ public ExpiringMap(int timeToLive, int expirationInterval) { this(new ConcurrentHashMap<K, ExpiringObject>(), new CopyOnWriteArrayList<ExpirationListener<V>>(), timeToLive, expirationInterval); } private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate, CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners, int timeToLive, int expirationInterval) { this.delegate = delegate;//需要过期检查的对象集合(报文会话) this.expirationListeners = expirationListeners;//过期监听器 this.expirer = new Expirer();//过期检查线程 expirer.setTimeToLive(timeToLive);//设置对象存活时间 expirer.setExpirationInterval(expirationInterval);//设置检查线程检查过期元素间隔 } //此处省略一些方法,主要是put,get,contain,remove等操作 ... //过期map元素 private class ExpiringObject { private K key; private V value; private long lastAccessTime;//上次访问时间 //可重入读写锁,保护lastAccessTime的读写操作 private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock(); ExpiringObject(K key, V value, long lastAccessTime) { if (value == null) { throw new IllegalArgumentException("An expiring object cannot be null."); } this.key = key; this.value = value; this.lastAccessTime = lastAccessTime; } public long getLastAccessTime() { lastAccessTimeLock.readLock().lock(); try { return lastAccessTime; } finally { lastAccessTimeLock.readLock().unlock(); } } public void setLastAccessTime(long lastAccessTime) { lastAccessTimeLock.writeLock().lock(); try { this.lastAccessTime = lastAccessTime; } finally { lastAccessTimeLock.writeLock().unlock(); } } public K getKey() { return key; } public V getValue() { return value; } @Override public boolean equals(Object obj) { return value.equals(obj); } @Override public int hashCode() { return value.hashCode(); } } /** * A Thread that monitors an {@link ExpiringMap} and will remove * elements that have passed the threshold. * */ public class Expirer implements Runnable { //状态锁 private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); private long timeToLiveMillis;//保活时间 private long expirationIntervalMillis;//过期检查间隔时间 private boolean running = false; private final Thread expirerThread; /** * Creates a new instance of Expirer. * */ public Expirer() { expirerThread = new Thread(this, "ExpiringMapExpirer-" + expirerCount++); expirerThread.setDaemon(true); } /** * {@inheritDoc} */ @Override public void run() { while (running) { processExpires(); try { Thread.sleep(expirationIntervalMillis); } catch (InterruptedException e) { // Do nothing } } } private void processExpires() { long timeNow = System.currentTimeMillis(); //遍历代理Map中的过期元素ExpiringObject for (ExpiringObject o : delegate.values()) { if (timeToLiveMillis <= 0) { continue; } long timeIdle = timeNow - o.getLastAccessTime(); if (timeIdle >= timeToLiveMillis) { //如果过期,则从代理Map中移除对象 delegate.remove(o.getKey()); for (ExpirationListener<V> listener : expirationListeners) { //通知过期监听器,过期对象已移除 listener.expired(o.getValue()); } } } } /** * Kick off this thread which will look for old objects and remove them. *启动过期检查线程 */ public void startExpiring() { stateLock.writeLock().lock(); try { if (!running) { running = true; expirerThread.start(); } } finally { stateLock.writeLock().unlock(); } } /** * If this thread has not started, then start it. * Otherwise just return; 如果过期检查线程没有启动,则启动 */ public void startExpiringIfNotStarted() { stateLock.readLock().lock(); try { if (running) { return; } } finally { stateLock.readLock().unlock(); } stateLock.writeLock().lock(); try { if (!running) { running = true; expirerThread.start(); } } finally { stateLock.writeLock().unlock(); } } /** * Stop the thread from monitoring the map. 中断过期检查线程,监控过期Map */ public void stopExpiring() { stateLock.writeLock().lock(); try { if (running) { running = false; expirerThread.interrupt(); } } finally { stateLock.writeLock().unlock(); } } /** * Checks to see if the thread is running * * @return * If the thread is running, true. Otherwise false. */ public boolean isRunning() { stateLock.readLock().lock(); try { return running; } finally { stateLock.readLock().unlock(); } } //配置获取对象生存时间 /** * @return the Time-to-live value in seconds. */ public int getTimeToLive() { stateLock.readLock().lock(); try { return (int) timeToLiveMillis / 1000; } finally { stateLock.readLock().unlock(); } } /** * Update the value for the time-to-live * * @param timeToLive * The time-to-live (seconds) */ public void setTimeToLive(long timeToLive) { stateLock.writeLock().lock(); try { this.timeToLiveMillis = timeToLive * 1000; } finally { stateLock.writeLock().unlock(); } } //配置获取过期检查间隔 /** * Get the interval in which an object will live in the map before * it is removed. * * @return * The time in seconds. */ public int getExpirationInterval() { stateLock.readLock().lock(); try { return (int) expirationIntervalMillis / 1000; } finally { stateLock.readLock().unlock(); } } /** * Set the interval in which an object will live in the map before * it is removed. * * @param expirationInterval * The time in seconds */ public void setExpirationInterval(long expirationInterval) { stateLock.writeLock().lock(); try { this.expirationIntervalMillis = expirationInterval * 1000; } finally { stateLock.writeLock().unlock(); } } } }
//对象过期监听器ExpirationListener
public interface ExpirationListener { public abstract void expired(Object obj); }
发表评论
-
Mina 报文连接器(NioDatagramConnector)
2017-06-14 08:46 1431Mina 抽象Polling连接器(A ... -
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)
2017-06-13 09:51 2589Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina 报文通信简单示例
2017-06-12 09:01 2601MINA TCP简单通信实例:http://donald-dr ... -
Mina socket连接器(NioSocketConnector)
2017-06-12 08:37 4793Mina 抽象Polling连接器(AbstractPolli ... -
Mina 抽象Polling连接器(AbstractPollingIoConnector)
2017-06-11 21:29 1019Mina 连接器接口定义及抽象实现(IoConnector ) ... -
Mina 连接器接口定义及抽象实现(IoConnector )
2017-06-11 13:46 1841Mina IoService接口定义及抽象实现:http:// ... -
Mina socket监听器(NioSocketAcceptor)
2017-06-09 08:44 3439Mina IoService接口定义及抽象实现:http:// ... -
Mina 抽象polling监听器
2017-06-08 22:32 791Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina Io监听器接口定义及抽象实现
2017-06-07 13:02 1362Mina IoService接口定义及抽象实现:http:// ... -
Mina IoService接口定义及抽象实现
2017-06-06 23:44 1206Mina IoHandler接口定义:http://donal ... -
Mina Nio会话(Socket,DataGram)
2017-06-06 12:53 1222Mina Socket会话配置:http://donald-d ... -
Mina 抽象Io会话
2017-06-05 22:45 1024Mina Io会话接口定义:http://donald-dra ... -
Mina Io会话接口定义
2017-06-04 23:15 1180Mina Nio处理器:http://donald-drape ... -
Mina Nio处理器
2017-06-04 22:19 751Mina Io处理器抽象实现:http://donald-dr ... -
Mina Io处理器抽象实现
2017-06-03 23:52 1158Mina 过滤链抽象实现:http://donald-drap ... -
Mina IoHandler接口定义
2017-06-01 21:30 1741Mina 过滤链抽象实现:http://donald-drap ... -
MINA 多路复用协议编解码器工厂二(多路复用协议解码器)
2017-06-01 12:52 2289MINA 多路复用协议编解码器工厂一(多路复用协议编码器): ... -
MINA 多路复用协议编解码器工厂一(多路复用协议编码器)
2017-05-31 22:22 1877MINA 多路分离解码器实例:http://donald-dr ... -
Mina 累计协议解码器
2017-05-31 00:09 1237MINA 编解码器实例:http://donald-drape ... -
Mina 协议编解码过滤器三(会话write与消息接收过滤)
2017-05-28 07:22 1761Mina 协议编解码过滤器一(协议编解码工厂、协议编码器): ...
相关推荐
在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理方面的实践。 1. **Mina的高级使用** Mina的核心在于其异步事件驱动的模型,这使得它在处理大量并发连接时表现出色。...
在"mina学习基础-入门实例-传输定长报文(三)"这个主题中,我们将深入探讨如何使用Mina实现定长报文的传输,并且利用Mina内置的SSL过滤器进行报文加密。 首先,让我们了解什么是定长报文。在通信协议中,定长报文是...
总之,自定义Mina编码器以实现会话累积功能涉及到了网络通信的优化,需要对Mina框架有深入的理解,以及对序列化和网络通信协议的熟悉。通过这样的定制,我们可以更高效地利用网络资源,提升系统的整体性能。
Java Mina2是一个高度可扩展且高性能的网络通信框架,主要用在开发基于TCP、UDP等协议的服务端应用。它提供了简单而强大的API,使得开发者能够轻松构建网络应用程序,如服务器端的聊天室、游戏服务器或者任何需要...
基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。 Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。
本文将深入探讨如何使用Mina来实现一个服务器以及对应的Socket客户端,从而进行消息传递。 **Mina 框架介绍** Mina提供了一个高级抽象层,允许开发者用类似处理Java IO的方式处理NIO(非阻塞I/O)。它简化了网络...
- **添加编码器和解码器**:通过`FilterChainBuilder`创建过滤器链,添加自定义的编码器和解码器,确保数据在发送和接收时正确处理。 3. **创建Mina客户端** - **配置Connector**:使用`TcpClient`或`...
3. **过滤器链**:MINA的核心设计之一就是过滤器链,它允许开发者在数据发送和接收的过程中插入自定义的处理逻辑。在实现加密通信时,我们需要创建一个包含SSLFilter的过滤器链,这个过滤器负责处理SSL/TLS相关的...
2. **事件驱动**:MINA基于事件驱动的设计,通过监听网络事件(如连接建立、数据到达等)来触发相应的处理逻辑,简化了复杂网络应用的编程。 3. **异步通信**:MINA的异步通信模式意味着发送请求后无需等待响应,...
- IoSession:表示客户端和服务端之间的会话,包含了会话状态、缓冲区、过滤器链等信息。 - IoFilter:类似于Servlet中的Filter,处理数据的读写,提供诸如加密、压缩、协议解析等功能。 - IoHandler:处理网络事件...
### Apache MINA 2.0 用户指南:基础知识 #### 基础概念介绍 Apache MINA 2.0 是一款高性能且易于使用的网络...在未来的学习过程中,我们将更深入地探索 MINA 的高级特性,如会话管理、错误处理、安全性和扩展性等。
文件"MinaSocket"可能包含了实现上述功能的详细代码,包括服务端的Acceptor配置、过滤器设置、事件处理器编写,以及客户端的Socket连接、数据发送和接收等。通过阅读和理解这些代码,你可以更好地掌握Mina与Socket...
5. **事件监听**:在IoSession上设置事件监听器,以便在特定事件发生时执行相应操作,例如在按钮点击事件中触发消息发送。 6. **发送数据**:通过IoSession的write()方法发送数据。在这个例子中,可能是点击按钮后...
Mina提供了一种事件驱动的模型,通过IoSession接口来管理连接,包括读写数据、添加监听器、关闭连接等操作。IoSession是连接状态的容器,包含了会话中的所有信息,如远程地址、本地地址、缓冲区大小、已发送和接收的...
客户端同样使用了MINA的编解码器来编码要发送的数据,并在接收服务器响应时解码返回的数据。在这个过程中,编解码器的正确配置和使用是确保数据完整性和正确性的关键。 在实际开发中,我们可能会自定义编解码器来...
- 它基于IoSession接口,用于管理客户端与服务器之间的连接,包括读写操作、会话属性等。 2. **编解码器概念** - 编解码器是mina中的核心组件,分为编码器(Encoder)和解码器(Decoder)两部分,分别处理数据的...
在消息推送场景下,Mina通常扮演服务器端的角色,接收客户端的连接请求,建立长连接,然后通过这些连接向客户端发送实时消息。例如,在一个聊天应用中,服务器需要将新的消息即时推送给在线用户。Mina的Session对象...
`IoService` 的API包括添加和移除监听器、广播消息到所有会话、关闭服务、检查服务状态以及设置和获取过滤器链等。这些方法使得开发者能够灵活地控制和监控网络通信的过程。 总结来说,Mina通过其分层架构和丰富的...
这通常涉及到对CMPP协议报文结构的深入理解,包括报文头、消息ID、业务类型等字段。 4. **消息处理**:定义消息处理器,处理接收到的各种CMPP消息,如CMPP_SUBMIT(短信提交)、CMPP_DELIVER(短信接收)等,根据...
在本项目中,Mina将被用来创建一个服务器端,监听客户端的连接并处理接收到的数据。 整合Spring MVC和Mina的关键步骤如下: 1. **配置Spring MVC**:你需要创建一个Spring MVC配置文件(如`web.xml`和`spring-...