`
Donald_Draper
  • 浏览: 985955 次
社区版块
存档分类
最新评论

Mina Socket与报文过滤链

    博客分类:
  • Mina
阅读更多
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
引言:
在前一篇文章中我们看了一下过滤链的抽象实现,先来回顾一下
  AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
      EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
     HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
     TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
     添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
     过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件) 。
今天我们来看具体的过滤链实现SocketFilterChain和DatagramFilterChain,在AbstractIoFilterChain
这篇文章中我们我们过滤链触发IoSession的write/close事件除了传递事件外,
调用实际的事件操作doWrite/doClose,这两个方法为抽象方法,需要子类扩展实现。
在SocketFilterChain和DatagramFilterChain具体中主要是doWrite/doClose这两个方法的具体实现。
先来看一下SocketFilterChain
package org.apache.mina.transport.socket.nio;

import java.io.IOException;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.util.Queue;

/**
 * An {@link IoFilterChain} for socket transport (TCP/IP).
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 */
class SocketFilterChain extends AbstractIoFilterChain {

    SocketFilterChain(IoSession parent) {
        super(parent);
    }

    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        SocketSessionImpl s = (SocketSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
	//这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	    //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	   //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        //如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作,这个在以后在具体详说
        if (session.getTrafficMask().isWritable()) {
            s.getIoProcessor().flush(s);
        }
    }
    //关闭会话
    protected void doClose(IoSession session) throws IOException {
        SocketSessionImpl s = (SocketSessionImpl) session;
        s.getIoProcessor().remove(s);//委托给session关联的IoProcessor
    }
}

来看SocketFilterChain实际会话关闭工作
//关闭会话
    protected void doClose(IoSession session) throws IOException {
        SocketSessionImpl s = (SocketSessionImpl) session;
        s.getIoProcessor().remove(s);
    }

//SocketIoProcessor
class SocketIoProcessor {
    ...
    private final Queue removingSessions = new Queue();//存放关闭的会话队列
     void remove(SocketSessionImpl session) throws IOException {
        scheduleRemove(session);//将会话添加到待移除会话队列
        startupWorker();//这一步我们在后面将SocketIoProcessor的时候再说
    }
    private void scheduleRemove(SocketSessionImpl session) {
        synchronized (removingSessions) {
            removingSessions.push(session);
        }
    }
   ...
}

小节:
SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,
主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法
将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。

再来看一下DatagramSessionImpl:
package org.apache.mina.transport.socket.nio.support;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.util.Queue;

/**
 * An {@link IoFilterChain} for datagram transport (UDP/IP).
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 */
class DatagramFilterChain extends AbstractIoFilterChain {

    DatagramFilterChain(IoSession parent) {
        super(parent);
    }

    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
        //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	     //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	    //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        
        if (session.getTrafficMask().isWritable()) {
	     //DatagramSessionImpl
	     //private final DatagramService managerDelegate;
	    //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作,
	    //这个在以后在具体详说
            s.getManagerDelegate().flushSession(s);
        }
    }

    protected void doClose(IoSession session) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
        DatagramService manager = s.getManagerDelegate();
	////委托给session关联的managerDelegate(DatagramService)关闭会话
        if (manager instanceof DatagramConnectorDelegate) {
	    //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看
            ((DatagramConnectorDelegate) manager).closeSession(s);
        } else {
	    //通知DatagramAcceptorDelegate的监听器会话已关闭
            ((DatagramAcceptorDelegate) manager).getListeners()
                    .fireSessionDestroyed(session);
	    //设置会话CloseFuture为已关闭状态
            session.getCloseFuture().setClosed();
        }
    }
}

从上面DatagramSessionImpl关联的managerDelegate(DatagramService)两种分别为DatagramConnectorDelegate和DatagramAcceptorDelegate
//DatagramAcceptorDelegate
public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
        IoAcceptor, DatagramService {

//DatagramConnectorDelegate
public class DatagramConnectorDelegate extends BaseIoConnector implements
        DatagramService {

今天这篇文章只是简单做一个简单的介绍,主要是对上一篇过滤链抽象实现的补充,文章中涉及的IoService和
IoProcessor我们还有讲到,后面讲到是在具体的说。
从上面可以看出,DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。


总结:
     SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。
      DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。
关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。
0
1
分享到:
评论

相关推荐

    Android-MinaSocket一款基于Mina的Socket长连接库

    6. **封装与易用性**:`Android-MinaSocket` 对Mina进行了封装,为Android开发者提供更友好的API,降低了使用门槛。 在实际使用`Android-MinaSocket` 时,开发者需要注意以下几点: - **连接管理**:保持长连接...

    Mina+Socket通信

    文件"MinaSocket"可能包含了实现上述功能的详细代码,包括服务端的Acceptor配置、过滤器设置、事件处理器编写,以及客户端的Socket连接、数据发送和接收等。通过阅读和理解这些代码,你可以更好地掌握Mina与Socket...

    Mina Socket 源代码

    **Mina Socket 源代码解析** Mina Socket 是 Apache Mina 项目的一部分,它是一个高性能、可扩展的网络通信框架。Mina 提供了一种简单的方式来构建网络应用,如服务器和客户端,支持多种协议,如 TCP/IP 和 UDP。在...

    apache mina socket

    3. **Filter Chain**:Mina的过滤器链是其核心设计之一,允许开发者插入自定义的过滤器,对数据进行处理、转换或者日志记录等操作。数据在进出应用时都会经过过滤器链。 4. **Session对象**:在Mina中,每个网络...

    socket通信,mina长连接通信

    Socket通信和MINA长连接是网络编程中的两个关键概念,主要应用于服务器与客户端之间的数据交互。在移动应用开发,特别是需要实时推送功能时,这两种技术显得尤为重要。 **Socket通信** Socket,也称为套接字,是...

    mina 服务器socket客服端发消息

    本文将深入探讨如何使用Mina来实现一个服务器以及对应的Socket客户端,从而进行消息传递。 **Mina 框架介绍** Mina提供了一个高级抽象层,允许开发者用类似处理Java IO的方式处理NIO(非阻塞I/O)。它简化了网络...

    mina带心跳长链接+socket长链接+线程池

    mina带心跳长链接,可实现服务间通信。socket长连接实现客户端与服务端的通信。对于通信技术学习是非常好的资料。改造后可实现企业应用

    mina学习基础-入门实例-传输定长报文(三)

    在"mina学习基础-入门实例-传输定长报文(三)"这个主题中,我们将深入探讨如何使用Mina实现定长报文的传输,并且利用Mina内置的SSL过滤器进行报文加密。 首先,让我们了解什么是定长报文。在通信协议中,定长报文是...

    socket通讯和mina应用

    Socket通讯和MINA应用是Java网络编程中的两个关键概念,它们在开发分布式系统、网络服务和客户端应用程序中扮演着重要角色。这篇博文将深入探讨这两个主题,并通过一个名为"testMina"的压缩包文件来展示实际应用。 ...

    socket 与 mina 交互数据

    Socket和Mina是Java网络...对于开发者来说,理解Socket的基本原理和Mina的过滤器模型,以及如何实现数据的编解码,是掌握这两个工具的关键。在实际开发中,根据项目需求选择合适的工具,能够提高代码的效率和可维护性。

    mina框架中socket使用,有服务端和客户端。

    - `FilterChain`:MINA的过滤器链机制,允许开发者插入自定义的过滤器,实现数据的预处理和后处理。 - `IoHandler`:主要负责处理网络事件,如读取数据、写入数据、连接建立或断开等。 - `ProtocolCodecFactory`...

    java客户端socket与mina服务端通信

    Java客户端Socket与Mina服务端通信是网络编程中常见的应用场景,尤其在开发分布式系统或实时数据传输时。这里我们将深入探讨这两个技术,并了解如何通过它们建立保持长连接的通信。 首先,Socket是Java中用于实现...

    一般Socket客户端与Mina NIO Socket客户端对比示例

    本文将通过一个对比实例,探讨一般Socket客户端与Mina NIO (Non-blocking I/O) Socket客户端的差异和特点,帮助开发者理解这两种技术在实际应用中的选择。 首先,普通Socket客户端基于BIO(Blocking I/O)模型,它...

    socket mina测试框架

    Socket Mina测试框架是一个强大的网络通信应用框架,主要用于简化Java应用程序与远程服务器之间的通信。它提供了高度可扩展和高性能的I/O处理模型,使得开发者能够更专注于业务逻辑,而不是底层的网络实现细节。Mina...

    mina socket客户度工程相关类

    1.mina socket客户度工程相关类,添加mina jar包后可独立运行。 2.mina若有空闲连接则使用已有连接,若无则新建mina连接; 3.mina空闲连接超过保活时间25分钟后,自动删除; 4.mina发送指令后,接收指定时长内收到的...

    apache mina socket实例

    mina简单示例,Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速...

    Socket及Mina的讲解

    此外,Mina还提供了丰富的过滤器链机制,允许在数据传输过程中进行拦截、转换和处理,增强了网络应用的灵活性和可扩展性。 在AndroidPN(Android Push Notification)项目中,由于需要实时地将服务器端的通知推送到...

    mina的高级使用,mina文件图片传送,mina发送文件,mina报文处理,mina发送xml和json

    首先,我们需要将图片文件读取到内存中的IoBuffer,然后通过过滤器链传递给远程客户端。在这个过程中,可以实现数据压缩、加密等额外功能。同时,Mina提供了进度反馈机制,可以实时监控文件传输进度,确保传输的可靠...

    MINA 协议解码过滤器

    过滤器链的概念是MINA框架的核心特性之一,它允许开发者插入自定义的过滤器来处理进来的数据或者发送出去的数据。协议解码过滤器(ProtocolDecoderFilter)就是这样的一个过滤器,它的主要任务是从接收到的原始字节...

    mina socket 代码

    2. **配置Filter Chain**:在Acceptor上设置过滤器链,过滤器链是MinA的核心概念,它处理I/O事件和数据。你可以添加自定义的过滤器来实现数据编码、解码、认证、加密等功能。 3. **实现Protocol Handler**:创建一...

Global site tag (gtag.js) - Google Analytics