- 浏览: 194491 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
only_java:
博主,你好。感谢这篇你的这篇文章,我的问题是跟你一样,也是在跑 ...
JVM Crash分析 -
shuofenglxy:
1 确保程序运行时没有更新程序需要的相关jar包。2 确保程序 ...
JVM Crash分析 -
renduly:
# A fatal error has been detect ...
JVM Crash分析 -
shuofenglxy:
renduly 写道博主好。这两天我在公司程序也出现了类似的问 ...
JVM Crash分析 -
renduly:
博主好。这两天我在公司程序也出现了类似的问题。博主能否说的详细 ...
JVM Crash分析
IoFilterChain主要是维护一个这样的容器,这个容器维护一个IoSesion和相对应的IoFilterChain(1对1关系),都是串行执行的。
IOFilterChain中主要有事件触发的一些方法和保存每一个具体IoFilter的Entry(存放一些具体IoFilter相关的信息,主要包括名字,IoFilter,下一个IoFilter等,以及一些相对应的操作)。
IoFilterChain的一个实现类是:DefaultIoFilterChain。主要实现的功能是给IoFilterChain中增删IoFilter等。具体实现的办法可以用过如下源码来看:
public synchronized void addFirst(String name, IoFilter filter) { checkAddable(name); register(head, name, filter); } public synchronized void addLast(String name, IoFilter filter) { checkAddable(name); register(tail.prevEntry, name, filter); } public synchronized void addBefore(String baseName, String name, IoFilter filter) { EntryImpl baseEntry = checkOldName(baseName); checkAddable(name); register(baseEntry.prevEntry, name, filter); } public synchronized void addAfter(String baseName, String name, IoFilter filter) { EntryImpl baseEntry = checkOldName(baseName); checkAddable(name); register(baseEntry, name, filter); } public synchronized IoFilter remove(String name) { EntryImpl entry = checkOldName(name); deregister(entry); return entry.getFilter(); } public synchronized void remove(IoFilter filter) { EntryImpl e = head.nextEntry; while (e != tail) { if (e.getFilter() == filter) { deregister(e); return; } e = e.nextEntry; } throw new IllegalArgumentException("Filter not found: " + filter.getClass().getName()); } public synchronized IoFilter remove(Class<? extends IoFilter> filterType) { EntryImpl e = head.nextEntry; while (e != tail) { if (filterType.isAssignableFrom(e.getFilter().getClass())) { IoFilter oldFilter = e.getFilter(); deregister(e); return oldFilter; } e = e.nextEntry; } throw new IllegalArgumentException("Filter not found: " + filterType.getName()); } public synchronized IoFilter replace(String name, IoFilter newFilter) { EntryImpl entry = checkOldName(name); IoFilter oldFilter = entry.getFilter(); entry.setFilter(newFilter); return oldFilter; } public synchronized void replace(IoFilter oldFilter, IoFilter newFilter) { EntryImpl e = head.nextEntry; while (e != tail) { if (e.getFilter() == oldFilter) { e.setFilter(newFilter); return; } e = e.nextEntry; } throw new IllegalArgumentException("Filter not found: " + oldFilter.getClass().getName()); } public synchronized IoFilter replace( Class<? extends IoFilter> oldFilterType, IoFilter newFilter) { EntryImpl e = head.nextEntry; while (e != tail) { if (oldFilterType.isAssignableFrom(e.getFilter().getClass())) { IoFilter oldFilter = e.getFilter(); e.setFilter(newFilter); return oldFilter; } e = e.nextEntry; } throw new IllegalArgumentException("Filter not found: " + oldFilterType.getName()); } public synchronized void clear() throws Exception { List<IoFilterChain.Entry> l = new ArrayList<IoFilterChain.Entry>( name2entry.values()); for (IoFilterChain.Entry entry : l) { try { deregister((EntryImpl) entry); } catch (Exception e) { throw new IoFilterLifeCycleException("clear(): " + entry.getName() + " in " + getSession(), e); } } } private void register(EntryImpl prevEntry, String name, IoFilter filter) { EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter); try { filter.onPreAdd(this, name, newEntry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e); } prevEntry.nextEntry.prevEntry = newEntry; prevEntry.nextEntry = newEntry; name2entry.put(name, newEntry); try { filter.onPostAdd(this, name, newEntry.getNextFilter()); } catch (Exception e) { deregister0(newEntry); throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e); } } private void deregister(EntryImpl entry) { IoFilter filter = entry.getFilter(); try { filter.onPreRemove(this, entry.getName(), entry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPreRemove(): " + entry.getName() + ':' + filter + " in " + getSession(), e); } deregister0(entry); try { filter.onPostRemove(this, entry.getName(), entry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPostRemove(): " private class HeadFilter extends IoFilterAdapter {@SuppressWarnings("unchecked") @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; // Maintain counters. if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { // Zero-sized buffer means the internal message // delimiter. s.increaseScheduledWriteMessages(); } else { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } s.getWriteRequestQueue().offer(s, writeRequest); if (!s.isWriteSuspended()) { s.getProcessor().flush(s); } } @SuppressWarnings("unchecked") @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { ((AbstractIoSession) session).getProcessor().remove(((AbstractIoSession) session)); } }+ entry.getName() + ':' + filter + " in " + getSession(), e); } } private void deregister0(EntryImpl entry) { EntryImpl prevEntry = entry.prevEntry; EntryImpl nextEntry = entry.nextEntry; prevEntry.nextEntry = nextEntry; nextEntry.prevEntry = prevEntry; name2entry.remove(entry.name); }
通过调用这些方法,完成自己对自定制的IoFilterChain的配置。
DefaultIoFilterChain实现中默认在头和尾设置了两个Filter。先看head,源码如下:
private class HeadFilter extends IoFilterAdapter { @SuppressWarnings("unchecked") @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; // Maintain counters. if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining == 0) { // Zero-sized buffer means the internal message // delimiter. s.increaseScheduledWriteMessages(); } else { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } s.getWriteRequestQueue().offer(s, writeRequest); if (!s.isWriteSuspended()) { s.getProcessor().flush(s); } } @SuppressWarnings("unchecked") @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { ((AbstractIoSession) session).getProcessor().remove(((AbstractIoSession) session)); } }
Head主要的作用是做一个数据统计维持AbstractIoSession 中 scheduledWriteMessages这个统计量。然后将这个session放入 writeRequestQueue中等待写入。当这个IoFilter关闭时,则从IoProcessor中删除这个session以释放占用的相关资源。
tail相关源码如下:
private static class TailFilter extends IoFilterAdapter {
@Override
public void sessionCreated(NextFilter nextFilter, IoSession session)
throws Exception {
try {
session.getHandler().sessionCreated(session);
} finally {
// Notify the related future.
ConnectFuture future = (ConnectFuture) session
.removeAttribute(SESSION_CREATED_FUTURE);
if (future != null) {
future.setSession(session);
}
}
}
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session)
throws Exception {
session.getHandler().sessionOpened(session);
}
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session)
throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
try {
s.getHandler().sessionClosed(session);
} finally {
try {
s.getWriteRequestQueue().dispose(session);
} finally {
try {
s.getAttributeMap().dispose(session);
} finally {
try {
// Remove all filters.
session.getFilterChain().clear();
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerClosedReadFuture();
}
}
}
}
}
}
@Override
public void sessionIdle(NextFilter nextFilter, IoSession session,
IdleStatus status) throws Exception {
session.getHandler().sessionIdle(session, status);
}
@Override
public void exceptionCaught(NextFilter nextFilter, IoSession session,
Throwable cause) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
try {
s.getHandler().exceptionCaught(s, cause);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerFailedReadFuture(cause);
}
}
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
if (!(message instanceof IoBuffer)) {
s.increaseReadMessages(System.currentTimeMillis());
} else if (!((IoBuffer) message).hasRemaining()) {
s.increaseReadMessages(System.currentTimeMillis());
}
try {
session.getHandler().messageReceived(s, message);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerReadFuture(message);
}
}
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
session.getHandler()
.messageSent(session, writeRequest.getMessage());
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
nextFilter.filterWrite(session, writeRequest);
}
@Override
public void filterClose(NextFilter nextFilter, IoSession session)
throws Exception {
nextFilter.filterClose(session);
}
}
这个IoFilter主要是响应一些事件。具体通过方法名比较容易明白,这里不再罗嗦。
值得注意的是,所有的自定义的IoFilter插入都放在了head之后,tail之前。可以定义Logger,序列化工具,业务线程池等多个需要IoFilter。真正改进性能的地方可能就是业务处理的话,业务处理线程池,序列化的话,看能不能改进序列化的效率,如采用protocalBuffer这种东东了。具体的等谈Mina的Thread model的时候在展开一些池的使用场景吧。
关于IoFilter的使用可以直接参阅 http://mina.apache.org/iofilter.html 的内容来选择合适的filter来实现想要的功能。
相关推荐
《Mina开发之客户端详解》 Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的、高性能的网络应用框架,主要用于构建服务器端的网络应用程序。它简化了网络编程的复杂性,提供了基于事件驱动和异步I/O...
Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...
**Mina入门:Mina版之HelloWorld** Apache Mina是一个开源项目,它提供了一个高度模块化、高性能的网络通信框架。Mina旨在简化网络应用的开发,支持多种传输协议,如TCP、UDP、HTTP、FTP等。在这个“Mina入门:Mina...
1. **Filter Chain**:Mina的核心设计模式之一是过滤器链。每个连接都有一系列过滤器,它们按照顺序处理入站和出站事件。过滤器可以实现特定功能,如数据编码解码、安全验证、性能监控等。 2. **Session**:Session...
Mina开发之服务器的代码,详情请查看:http://www.cnblogs.com/getherBlog/p/3937196.html Mina开发之客户端的代码,详情请查看:http://www.cnblogs.com/getherBlog/p/3937196.html
Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...
Mina.jar可能包含了Mina的核心组件,如IoSession接口,IoFilterChain,以及各种I/O处理器等。此外,它可能还包含了支持不同协议(如TCP,UDP)的处理器,以及各种I/O事件的监听器。 在压缩包子文件的文件名称列表中...
MINA (Java IO Network Application Framework) 是一个由Apache软件基金会开发的开源网络通信框架,主要应用于构建高性能、高可用性的网络服务器。这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和...
Mina和Socket是两种常见的网络通信框架和技术,它们在Java编程环境中被广泛使用。本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic ...
在MINA的网络编程中,我们通常创建一个IoFilterChain,并将SSLFilter添加到其中,以启用SSL/TLS支持。以下是一段创建SSLFilter并添加到过滤器链的基本代码示例: ```java SslFilter sslFilter = new SslFilter(ssl...
- **核心类库**:包含IoAcceptor、IoConnector、IoSession、IoFilterChain等核心类的实现。 - **协议处理**:MINA提供了一些预定义的协议处理器,如TCP、UDP的实现,以及一些基础的协议编码解码器。 - **示例应用**...
mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...
mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...
标题中的“给予mina协议进行大数据传输”指的是一种基于Java的网络通信框架——Apache MINA(Model-View-Controller for Network Applications)。MINA是Apache软件基金会的一个项目,它提供了一个高度可扩展和高...
Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...
Apache Mina是一个高度可扩展的网络通信框架,它允许开发者创建高性能、高效率的服务端和客户端应用程序。在Java世界中,Mina以其简洁的API和灵活性而受到青睐,尤其适用于处理大量的并发连接,如TCP/IP和UDP协议。...
**mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...
**Mina官网例子之时间服务器** Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的网络通信框架,它为各种协议提供了低级别的基础结构,包括TCP/IP和UDP/IP。Mina的目标是简化网络编程,使其变得高效...
Apache Mina是一个开源项目,它提供了一个高度可扩展的网络通信框架,用于简化开发高性能、高可用性的网络服务器和客户端应用程序。"Mina demo mina jar包"指的是使用Apache Mina框架创建的一个演示示例,这个示例...
**Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...