`
jimmee
  • 浏览: 540709 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Mina编程的两个注意点

    博客分类:
  • NIO
 
阅读更多
  • 1. 首先,这是一个nio的框架,仍然是采用reactor模式,知道这一点后,那么编程就没有什么难的。nio的编程,无外乎就是这些套路, 再进一步说,网络编程,也就是这些套路了。
  • 2. 那么剩下编程的注意点,也就是编解码的处理以及最后的业务逻辑的处理。



2.1 编解码的注意点:因为在网络编程中,client和server之间,往往需要完整的接收到一条消息的后,才交给业务逻辑处理。具体可以参看http://jimmee.iteye.com/blog/617544,其中,我们常常是继承自CumulativeProtocolDecoder来实现自己的解码器,主要的docode的方法,其作用是将本次数据和上次接收到的数据(如果doDecode方法没有处理完的话),统一放到一个buffer中,之后扔给 doDecode方法处理,若处理完之后,还有剩下的数据,则继续缓存。

 /**
     * Cumulates content of <tt>in</tt> into internal buffer and forwards
     * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
     * and the cumulative buffer is compacted after decoding ends.
     *
     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
     *                               <tt>true</tt> not consuming the cumulative buffer.
     */
    public void decode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        if (!session.getTransportMetadata().hasFragmentation()) {
            while (in.hasRemaining()) {
                if (!doDecode(session, in, out)) {
                    break;
                }
            }

            return;
        }

        boolean usingSessionBuffer = true;
        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
        // If we have a session buffer, append data to that; otherwise
        // use the buffer read from the network directly.
        if (buf != null) {
            boolean appended = false;
            // Make sure that the buffer is auto-expanded.
            if (buf.isAutoExpand()) {
                try {
                    buf.put(in);
                    appended = true;
                } catch (IllegalStateException e) {
                    // A user called derivation method (e.g. slice()),
                    // which disables auto-expansion of the parent buffer.
                } catch (IndexOutOfBoundsException e) {
                    // A user disabled auto-expansion.
                }
            }

            if (appended) {
                buf.flip();
            } else {
                // Reallocate the buffer if append operation failed due to
                // derivation or disabled auto-expansion.
                buf.flip();
                IoBuffer newBuf = IoBuffer.allocate(
                        buf.remaining() + in.remaining()).setAutoExpand(true);
                newBuf.order(buf.order());
                newBuf.put(buf);
                newBuf.put(in);
                newBuf.flip();
                buf = newBuf;

                // Update the session attribute.
                session.setAttribute(BUFFER, buf);
            }
        } else {
            buf = in;
            usingSessionBuffer = false;
        }

	  // 上面操作完后,得到的buf是包含以前积累的数据(如果没有读取处理掉),再加上本次得到
        // 的数据,最后扔给doDecode处理。
        for (;;) {
            int oldPos = buf.position();
            boolean decoded = doDecode(session, buf, out);
            if (decoded) {
                if (buf.position() == oldPos) {
                    throw new IllegalStateException(
                            "doDecode() can't return true when buffer is not consumed.");
                }

                if (!buf.hasRemaining()) {
                    break;
                }
            } else {
                break;
            }
        }
		
        // if there is any data left that cannot be decoded, we store
        // it in a buffer in the session and next time this decoder is
        // invoked the session buffer gets appended to
        if (buf.hasRemaining()) {
            if (usingSessionBuffer && buf.isAutoExpand()) {
                buf.compact();
            } else {
                storeRemainingInSession(buf, session);
            }
        } else {
            if (usingSessionBuffer) {
                removeSessionBuffer(session);
            }
        }
    }


可以看一个具体的实现:PrefixedStringDecoder的 doDecode方法,这个decode是消息长度+具体字节流的消息格式。

 protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.prefixedDataAvailable(prefixLength, maxDataLength)) {
            String msg = in.getPrefixedString(prefixLength, charset.newDecoder());
            out.write(msg);
            return true;
        }

        return false;
    }


其中, prefixedDataAvailable方法是判断得到IoBuffer里的数据是否满足一条消息了,如果再进去看这个方法的实现化,读取是,使用ByteBuffer的绝对位置的读取方法(这种读取不影响position的值的);当已经有一条完整的消息时,则用getPrefixedString读取(使用的是ByteBuffer的相对位置的读取方法,这会影响position的值,从而实际的消费掉数据).

2.2 业务逻辑的处理注意点,一般都会使用一个线程池处理。这里就有一个问题,有时候同一个连接的消息处理,是希望按照顺序来进行的。这也很简单,不需要保证一个连接的所有的业务处理都限定在一个固定的线程中,但是需要保证当有消息需要处理时,这些消息的处理,都在同一个线程中完成。当然了,mina中也有了相应的实现OrderedThreadPoolExecutor。实现原理很简单:
一个连接(session)对应一个消息队列,同时此session也放到一个队列中,说明这个session有消息需要处理。具体来说,就是使用SessionTasksQueue来表示一个Session的要处理的消息的队列;
使用
  /** A queue used to store the available sessions */
    private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();

waitingSessions表示有消息需要处理的session的队列。
/**
     * {@inheritDoc}
     */
    @Override
    public void execute(Runnable task) {
        if (shutdown) {
            rejectTask(task);
        }

        // Check that it's a IoEvent task
        checkTaskType(task);

        IoEvent event = (IoEvent) task;
        
        // Get the associated session
        IoSession session = event.getSession();
        
        // 得到保存session消息的队列
        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
        
        boolean offerSession;

        // propose the new event to the event queue handler. If we
        // use a throttle queue handler, the message may be rejected
        // if the maximum size has been reached.
        boolean offerEvent = eventQueueHandler.accept(this, event);
        
        if (offerEvent) {
            // Ok, the message has been accepted
            synchronized (tasksQueue) {
                // Inject the event into the executor taskQueue
                tasksQueue.offer(event);
                
		     // 如果session中的消息已经处理完了,说明没有线程在处理这个session,
                 // 重新提交给线程池处理
                if (sessionTasksQueue.processingCompleted) {
                    sessionTasksQueue.processingCompleted = false;
                    offerSession = true;
                } else {
                    offerSession = false;
                }

                if (LOGGER.isDebugEnabled()) {
                    print(tasksQueue, event);
                }
            }
        } else {
            offerSession = false;
        }

        if (offerSession) {
            // As the tasksQueue was empty, the task has been executed
            // immediately, so we can move the session to the queue
            // of sessions waiting for completion.
            waitingSessions.offer(session);
        }

        addWorkerIfNecessary();

        if (offerEvent) {
            eventQueueHandler.offered(this, event);
        }


对应的,看一下:

 private void runTasks(SessionTasksQueue sessionTasksQueue) {
            for (;;) {
                Runnable task;
                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
                
                synchronized (tasksQueue) {
                    task = tasksQueue.poll();
                    // 当已经没有任务时,处理此session的线程结束掉
                    if (task == null) {
                        sessionTasksQueue.processingCompleted = true;
                        break;
                    }
                }

                eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);

                runTask(task);
            }
        }


小结:个人认为,只要对nio编程熟悉,对mina框架,只要明白了以上两点,就不再有什么大问题了。
分享到:
评论

相关推荐

    Mina+Socket通信

    Mina和Socket是两种常见的网络通信框架和技术,它们在Java编程环境中被广泛使用。本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic ...

    MinaServer for Android

    Socket是网络通信中的一个概念,它代表了两个网络节点间的一个通信链路。在Android平台上,Java的Socket类和ServerSocket类被用来建立和管理客户端与服务端之间的连接。ServerSocket监听特定端口的连接请求,当有...

    网络编程(socket、NIO、mina)---demo

    在这个"网络编程(socket、NIO、mina)---demo"的主题中,我们将深入探讨三个关键概念:Socket编程、非阻塞I/O(Non-blocking I/O,简称NIO)以及Apache Mina框架。这些技术广泛应用于构建高性能、高并发的网络应用...

    mina自定义编解码器详解

    - Mina提供了一个高效的、事件驱动的网络应用程序框架,简化了网络编程,尤其是TCP和UDP通信。 - 它基于IoSession接口,用于管理客户端与服务器之间的连接,包括读写操作、会话属性等。 2. **编解码器概念** - ...

    mina2技术知识

    3. **ProtocolDecoder/ProtocolEncoder**:这两个接口用于数据的解码和编码。MINA 提供了多种预定义的解码器和编码器,比如LineBasedFrameDecoder用于按行处理数据,ByteToMessageDecoder和MessageToByteEncoder则...

    mina2资料-各种教程

    IoFilter和IoHandler是MINA中的两个重要组件。IoHandler是处理网络事件的核心接口,它定义了接收到数据、连接建立、连接关闭等事件时需要执行的方法。IoFilter则是一个过滤器链,允许在数据传输到IoHandler之前对其...

    java-mina通信框架详解.docx

    Mina分为1.x和2.x两个主要分支,推荐使用最新的2.0版本。框架中包含了Server和Client的封装,简化了网络通信结构。在Mina的架构中,IoService接口负责在一个线程上建立套接字连接,并通过Selector监听连接状态。当...

    MINA断线重连死锁解决

    死锁一般发生在两个或多个并发进程互相等待对方释放资源而无法继续执行的情况。 首先,我们需要理解MINA的连接生命周期。当客户端与服务器建立连接后,如果网络中断,MINA会尝试自动重连。这个过程可能包括设置一个...

    mina2.0.3源代码

    Apache MINA是一个强大的开源网络应用框架,主要用于简化网络编程,特别是TCP/IP和UDP/IP协议的开发。MINA的核心是基于Java NIO(Non-blocking I/O)实现的,它提供了高度可扩展性和高效的性能,适用于处理大量并发...

    mina2学习笔记

    - **类结构**:IoService是Mina框架的核心接口,包含IoAcceptor和IoConnector两个实现,分别负责服务端和客户端的网络通信。 - **应用**:IoAcceptor用于服务端接受客户端的连接请求,IoConnector用于客户端发起连接...

    MINA 服务端和客户端demo

    这两个部分都是基于MINA框架构建的,分别实现了服务器接收客户端连接、处理数据和客户端发起连接、发送数据的功能。 1. **MINA服务端**: - MINA服务端的核心组件是Acceptor,它负责监听指定的端口,当有新的...

    Socket及Mina的讲解

    在此,我们将深入探讨这两个概念及其在AndroidPN项目中的应用。 首先,Socket通常被称为套接字,是网络通信的基础组件。在Java中,Socket类和ServerSocket类提供了一对一的通信机制,允许两个应用程序通过TCP/IP...

    socket 与 mina 交互数据

    Socket和Mina是Java网络编程中的两个重要工具,它们在构建高性能、高并发的网络应用中发挥着关键作用。Socket是TCP/IP通信的基础,而Mina是一个高效的网络应用框架,它简化了网络编程的复杂性。 Socket,也被称为套...

    一个Apache MINA使用案例源代码ApacheMina

    2. **ProtocolDecoder** 和 **ProtocolEncoder**: 这两个接口用于解码接收到的数据和编码要发送的数据。MINA提供了一种灵活的方式,可以根据实际需求定义自定义的编解码器。 3. **FilterChain**: 过滤器链是MINA中...

    mina开发手册与mina完全自学手册.rar

    Mina在Java世界中尤其受欢迎,因为它简化了网络编程的复杂性,允许开发者专注于业务逻辑,而不是底层的网络交互细节。 《Mina2.0完全剖析_完全自学手册.pdf》可能是对Apache Mina 2.0版本的详尽解析,涵盖其设计...

    Mina案例+使用文档.pdf

    Mina有1.0.x和1.1.x两个主要分支,其中1.0.x版本适合运行在JDK 1.4环境下,而1.1.x则要求JDK 1.5或以上版本。 - **依赖库**:除了Mina的核心库(如mina-core-1.1.7.jar),还需要下载SLF4J(Simple Logging Facade ...

    mina HTTP协议实例

    其工作流程主要包括请求和响应两个阶段,客户端发送HTTP请求到服务器,服务器处理请求并返回HTTP响应。MINA通过定义Filter(过滤器)和Handler(处理器)来处理这些请求和响应。 1. **Filter机制**:MINA的过滤器...

    基于spring的Mina框架

    **基于Spring的Mina框架详解** Mina框架是一款高性能、轻量级的网络通信框架,主要应用于开发基于TCP和UDP的网络应用。...开发者可以通过熟练掌握这两个框架的整合,构建出稳定、可扩展的网络应用。

    Java mina2源码

    每个Filter都有两个方法,`messageReceived`和`messageSent`,分别处理接收到的数据和发送出去的数据。开发者可以创建多个Filter,形成一个处理链,每个Filter执行特定的任务。 3. **Buffer**:Mina2使用Buffer对象...

Global site tag (gtag.js) - Google Analytics