IoFilterChain位于通讯层与业务层之间,负责将byte[]转化成业务层需要的业务逻辑bean,在mina框架中起着承前启后的作用。
DefaultIoFilterChain的构建
在初始话的时候,DefaultIoFilterChain的构造函数如下:
public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new NullPointerException("session");
}
this.session = session;
head = new EntryImpl(null, null, "head", new HeadFilter());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}
整体结构图:
IoFilterChain初始化的典型代码
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
初始化以后的结构图为
Message received流程
1. NioProcessor的工作者线程一旦发现有数据来则分别处理各个Session上的数据;
public void run() {
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) {
try {
boolean selected = select(1000);
nSessions += add();
updateTrafficMask();
if (selected) {
process();
}
//以下省略
private void process() throws Exception {
for (Iterator<T> i = selectedSessions(); i.hasNext();) {
process(i.next());
i.remove();
}
}
private void process(T session) {
if (isReadable(session) && session.getTrafficMask().isReadable()) {
read(session);
}
if (isWritable(session) && session.getTrafficMask().isWritable()) {
scheduleFlush(session);
}
}
2. 读取session上的所有数据,传给session的IoFilterChain
private void read(T session) {
//读取socket的数据,具体省略
if (readBytes > 0) {
session.getFilterChain().fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(e);
}
}
3. DefaultIoFilterChain调用HeadFilter的
public void fireMessageReceived(Object message) {
if (message instanceof IoBuffer) {
session.increaseReadBytes(
((IoBuffer) message).remaining(),
System.currentTimeMillis());
}
Entry head = this.head;
callNextMessageReceived(head, session, message);
}
private void callNextMessageReceived(
Entry entry, IoSession session, Object message) {
try {
entry.getFilter().messageReceived(
entry.getNextFilter(), session, message);
} catch (Throwable e) {
fireExceptionCaught(e);
}
}
4. 如上图head.getFilter()得到的是HeadFilter,注意EntryImpl的构造函数
public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new NullPointerException("session");
}
this.session = session;
head = new EntryImpl(null, null, "head", new HeadFilter());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}
private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry,
String name, IoFilter filter) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (name == null) {
throw new NullPointerException("name");
}
this.prevEntry = prevEntry;
this.nextEntry = nextEntry;
this.name = name;
this.filter = filter;
this.nextFilter = new NextFilter() {
public void sessionCreated(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionCreated(nextEntry, session);
}
public void sessionOpened(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionOpened(nextEntry, session);
}
public void sessionClosed(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionClosed(nextEntry, session);
}
public void sessionIdle(IoSession session, IdleStatus status) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionIdle(nextEntry, session, status);
}
public void exceptionCaught(IoSession session, Throwable cause) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextExceptionCaught(nextEntry, session, cause);
}
public void messageReceived(IoSession session, Object message) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageReceived(nextEntry, session, message);
}
public void messageSent(IoSession session,
WriteRequest writeRequest) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageSent(nextEntry, session, writeRequest);
}
public void filterWrite(IoSession session,
WriteRequest writeRequest) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterWrite(nextEntry, session, writeRequest);
}
public void filterClose(IoSession session) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterClose(nextEntry, session);
}
public void filterSetTrafficMask(IoSession session,
TrafficMask trafficMask) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterSetTrafficMask(nextEntry, session, trafficMask);
}
};
}
5. HeadFilter除了调用LogingFilter.MessageRecevied什么都不做
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) {
nextFilter.messageReceived(session, message);
}
6. LoggingFilter和ProtocolCodecFilter都是在执行自己的逻辑之后调用下一个IoFilter
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
log(IoEventType.MESSAGE_RECEIVED, "RECEIVED: {}", message);
nextFilter.messageReceived(session, message);
}
7. 在TailFilter的MessageReceved则将消息转发给Session的handler,完成传递
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
if (!(message instanceof IoBuffer)) {
s.increaseReadMessages(System.currentTimeMillis());
} else if (!((IoBuffer) message).hasRemaining()) {
s.increaseReadMessages(System.currentTimeMillis());
}
try {
session.getHandler().messageReceived(s, message);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerReadFuture(message);
}
}
}
IoFilter事件执行
IoFilter一共处理10种事件
1 created
2 opened
3 closed
4 sessionIdle
5 exceptionCaught
6 messageReceived
7 messageSent
8 fireWrite
9 fireFilterClose
10 filterSetTrafficMask
其中1-7种事件都是从前往后执行,依次为:
HeadFilter->filter1->filter2->...->filterN->TailFilter->IoHandler
而8-10正好相反,如fireWrite的执行顺序为:
iosession.write->TailFilter->filterN->....-->filter1->HeadFilter
- 大小: 116.1 KB
- 大小: 86 KB
- 大小: 96 KB
分享到:
相关推荐
在"MINA长连接框架实现通讯"的场景中,我们要讨论的核心知识点包括以下几个部分: 1. **MINA框架基础**:MINA是一个基于NIO(Non-blocking I/O)的框架,它提供了一种处理大量并发连接的方式,通过非阻塞I/O模型,...
在本篇博文中,我们将深入探讨如何利用Apache MINA库实现基于TLS/SSL的NIO(非阻塞I/O)Socket通信。MINA是一个高度可扩展的网络应用框架,广泛用于构建高性能、高并发的网络应用程序,如服务器端的TCP和UDP服务。...
### Mina框架研究与实现 #### 引言 在当今高度网络化的世界中,服务器端程序面临着前所未有的挑战,特别是当需要同时处理成百上千的客户端连接时。这不仅要求服务器具备高性能,还必须保证高可用性。Mina框架正是...
在这个“mina 实现简单通讯”的项目中,我们看到了一个基于MINA的基本通信实现,涵盖了服务端和客户端的交互。 首先,MINA的核心组件包括`IoSession`,它是网络连接的抽象,包含了与特定连接相关的所有信息,如输入...
**Mina自定义协议简单实现** Apache Mina(Minimum Asynchronous Network)是一个开源的网络通信框架,它为Java开发者提供了一种高效、灵活且可扩展的框架,用于构建高性能的网络应用程序,如服务器和客户端应用。...
基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。...Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。
本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic Application Networking API”)是Apache软件基金会的一个开源项目,它为开发者提供了一...
在这个实例中,我们将探讨如何使用Mina实现长连接和短连接。 首先,理解长连接和短连接的概念至关重要。在TCP/IP通信中,短连接(Short Connection)是指一次数据传输完成后立即关闭连接,而长连接(Long ...
Apache MINA 2 用户指南 Apache MINA 2 是一个基于 Java 语言的网络应用框架,旨在帮助开发者快速构建高性能、可靠、可扩展的网络应用程序。该框架提供了一个灵活的架构,使得开发者可以轻松地构建各种类型的网络...
本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...
《mina服务器和客户端实现详解》 Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的网络通信框架,它为开发者提供了构建高性能、高可用性的网络应用程序的基础。在本文中,我们将深入探讨如何利用...
在这个主题中,“mina服务器--实现纯文本和非纯文本的加密通讯”涉及了如何使用MINA来确保数据传输的安全性。 MINA提供了丰富的API和工具,使得开发者可以方便地创建基于TCP和UDP协议的服务器端和客户端应用。在...
6. **Transport Layer**:Mina支持多种传输层实现,如TCP、UDP等,这些都抽象为IoAcceptor和IoConnector接口,方便开发者使用。 深入研究源码,你可能会关注以下方面: - **mina-core**模块:这是Mina的核心库,...
Mina的ProtocolDecoder和ProtocolEncoder接口是实现这一过程的关键。 5. **Mina发送XML和JSON** XML和JSON作为常见的数据交换格式,经常在分布式系统中使用。Mina可以通过集成如JAXB(Java Architecture for XML ...
**使用mina实现CMPP2.0服务端的关键点** 1. **连接管理**:使用Mina的Acceptor来监听特定端口,接受来自移动网关的连接请求。每个连接通常对应一个独立的工作线程,处理来自客户端的CMPP请求。 2. **会话建立**:...
本源码是《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战》一文的服务端实现(MINA2版),详见:http://www.52im.net/thread-378-1-1.html
### Apache MINA 2.0 用户指南:基础知识 #### 基础概念介绍 Apache MINA 2.0 是一款高性能且易于使用的网络应用程序框架,它简化了开发人员在网络编程方面的负担,允许开发者专注于应用程序的核心功能,而不是底层...
这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和理解MINA框架有极大的帮助。 首先,`MINA-2.0.0-API.chm` 文件是MINA 2.0版本的API帮助文档,它是以CHM(Compiled Help Manual)格式编译的Windows...
在这个场景中,我们将讨论如何使用MINA来实现一个简单的登录功能。 首先,理解MINA的基本工作原理至关重要。MINA的核心是IoSession对象,它代表了服务器和客户端之间的持久连接。当客户端连接到服务器时,MINA会...