NioProcessor是mina中的另一个核心部分,与NioSocketAcceptor类似,NioProcessor三个主要功能是:
1、接受一个NioSession
2、出来NioSession上的read、write等事件
3、关闭一个NioSession
与NioSocketAcceptor类似,NioProcessor的实现采用了template模式,以上功能整体流程在NioProcessor的父类AbstractPollingIoProcessor中基本完成了,NioSocketAcceptor只是针对Nio的情况完成实现。
创建NioProcessor
如上图,NioSocketAcceptor创建了SimpleIoProcessorPool,SimpleIoProcessorPool中默认存在cpu数+1个NioProcessor,并且这些NioProcessor的工作者线程共享一个线程池。
接受一个NioSession
与NioSocketAcceptor新增一个端口绑定类似,NioProcessor.addSession只是将NioSocketAcceptor新建的NioSession放入一个消息队列中,由工作者线程负责初始化该NioSession,在selector为该session注册OP_READ事件。
关闭一个NioSession
与接受一个NioSession类似,不再描述
NioSession的数据处理
为NioProcessor继承自AbstractPollingIoProcessor的工作者线程中完成主要功能
1. 在循环中,selector监听所有端口,注意在NioProcessor中的select超时时间为1秒,这意味着最多一秒钟的时候,NioProcessor.Worker线程唤醒一次。而在NioSocketAcceptor.Work.run中select是没有超时时间的。下面Worker线程两次唤醒之间简称为一个周期,易知一个周期的长度小于等于一秒。
public void run() {
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) {
try {
boolean selected = select(1000);
nSessions += add();
updateTrafficMask();
if (selected) {
process();
}
long currentTime = System.currentTimeMillis();
flush(currentTime);
nSessions -= remove();
notifyIdleSessions(currentTime);
if (nSessions == 0) {
synchronized (lock) {
if (newSessions.isEmpty() && isSelectorEmpty()) {
worker = null;
break;
}
}
}
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
if (isDisposing()) {
for (Iterator<T> i = allSessions(); i.hasNext(); ) {
scheduleRemove(i.next());
}
wakeup();
}
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
try {
synchronized (disposalLock) {
if (isDisposing()) {
dispose0();
}
}
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
} finally {
disposalFuture.setValue(true);
}
}
2. 在addSession中从新增session队列newSeesion获取一个新增NioSession,并开始监控之。
private int add() {
int addedSessions = 0;
// Loop on the new sessions blocking queue, to count
// the number of sessions who has been created
for (;;) {
T session = newSessions.poll();
if (session == null) {
// We don't have anymore new sessions
break;
}
if (addNow(session)) {
// The new session has been added to the
addedSessions ++;
}
}
return addedSessions;
}
2.1 在addNow(T session)中完成了单个NioSession的初始化
private boolean addNow(T session) {
boolean registered = false;
boolean notified = false;
try {
init(session);
registered = true;
// Build the filter chain of this session.
session.getService().getFilterChainBuilder().buildFilterChain(
session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
notified = true;
} catch (Throwable e) {
if (notified) {
// Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
scheduleRemove(session);
session.getFilterChain().fireExceptionCaught(e);
wakeup();
} else {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
destroy(session);
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
registered = false;
}
}
}
return registered;
}
2.1.1 初始化session,这里是Template Method的又一个体现,因为不同的类型的session初始化实现不同, 在NioProcessor中包括:设置非堵塞模式.为该session注册OP_READ事件。
@Override
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
}
2.1.2 构建IoFilterChain,具体请参考
http://uniseraph.iteye.com/blog/228194
2.1.3 触发NioSession上的相关事件,依次为sessionCreated->sessionOpened ->
IoServiceListener的sessionCreated
3 修改sessio的traffic参数
从trafficControllingSessions获取需要修改session的traffic参数,具体与新增NioSession类似,不再详细描述。
4 接受处理socket数据并应答
关键内容来了,如果有NioSession的channel处理于OP_READ状态,则处理之
if (selected) {
process();
}
process方法对于所有发生了OP_READ或OP_WRITE的NioSession依次进行处理,注意虽然在NioSession初始化的时候只注册了OP_READ事件,但是在上一周期调用session.write方法的时候,上一周期的flush方法将会注册OP_WRITE方法。本周期发送的数据都是上一周期确定的。
private void process(T session) {
if (isReadable(session) && session.getTrafficMask().isReadable()) {
read(session);
}
if (isWritable(session) && session.getTrafficMask().isWritable()) {
scheduleFlush(session);
}
}
在read方法中读取socket上的数据,调用发IoFilter,逐层传递到IoHander(具体参考
http://uniseraph.iteye.com/blog/228194);如果读到-1,则增加一个关闭连接消息到队列中;如果发生异常,则异常调用IoFilter和IoHandler的fireExceptionCaught方法
private void read(T session) {
IoSessionConfig config = session.getConfig();
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
final boolean hasFragmentation =
session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0) {
session.getFilterChain().fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(e);
}
}
发送数据
6 关闭连接及其他
如果没有消息积累,也没有新创建的连接,则关闭线程池
- 大小: 9 KB
分享到:
相关推荐
Mina2与Netty4都使用了Reactor模式的线程模型,但具体实现细节上有所不同。 Mina2使用了一个IoAcceptor线程来监听客户端连接,对于每个监听的端口,它都会创建一个线程。一旦有新的连接,IoAcceptor会创建一个新的...
8. **NIOEventModelExamples**:NIO事件模型示例,涵盖了MINA的两种主要事件模型——“Proactor”(预读取器)和“Reactor”(反应器)模式。 9. **AsyncWriteExample**:异步写入示例,展示MINA如何处理大文件或大...
Mina采用Java NIO的Reactor实现,模拟Proactor模式。在某些系统中,如Windows,Proactor可以直接利用原生的异步I/O机制,如IO Completion Ports(IOCP),但在其他不支持原生异步I/O的系统上,可以通过Reactor模拟...
IoProcessor 是真正代表会话的实际 I/O 操作的接口,它对现有的 Reactor 模式架构的 Java NIO 框架继续做了一层封装。它的泛型参数指明了它能处理的会话类型。接口中最重要的几个方法,add 用于将指定会话加入到此 ...
`IoProcessor`是实际执行I/O操作的接口,它封装了Java NIO的Reactor模式。`add()`方法将会话添加到处理器,`flush()`强制刷新写请求队列,`remove()`则用于关闭和移出会话,`updateTrafficMask()`控制会话的读写行为...
`IoProcessor`是实际执行I/O操作的接口,它封装了Java NIO的Reactor模式。`add()`方法添加会话以进行处理,`flush()`强制清空写请求队列,`remove()`用于关闭和移除会话,`updateTrafficMask()`控制会话的读写权限。...
反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...
JetLinks 基于Java8,Spring Boot 2.x ,WebFlux,Netty,Vert.x,Reactor等开发, 是一个全响应式的企业级物联网平台。支持统一物模型管理,多种设备,多种厂家,统一管理。统一设备连接管理,多协议适配(TCP,MQTT,UDP,CoAP,...
实现案例**:Mina、Netty、Cindy等都是基于Reactor模式实现的高性能网络框架。 #### 六、构建高性能NIO框架的关键 **1. 减少数据拷贝** - **ByteBuffer的选择**:根据应用场景选择合适的`ByteBuffer`类型,如使用...
5. **事件驱动编程**:Java的NIO(非阻塞I/O)和Reactor模式常用于提高服务器处理效率,尤其是在高并发场景下。 6. **数据库存储**:聊天记录通常需要持久化存储,因此数据库管理(如MySQL、MongoDB等)是必需的,...
Dubbo 的异步通信机制是基于 Apache MINA 框架的 Reactor 模型通信框架,使用单一长连接和 NIO 异步通讯,适合小数据量大并发的服务调用。 下面是 Dubbo 的基本原理机制的详细说明: 客户端调用远程接口 1. ...