`
BruceXX
  • 浏览: 141450 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

mina的 StreamIoHandler

    博客分类:
  • J2SE
阅读更多

        最近看上了mina,其性能多多,完全不用说了。。,本来是想搞个file+msg传输的,结果。。被这个handler给 block 了,不知道这个东西扎用。。。按其Iohandler interface implement methods+ processStreamIo abstract method   写了一个sample,发现死活发不了包。。。哥怒了。

        没办法,开源的东西还是有好处地,可以深入,下了源码,发现其结构真的很不错,IoService(类似与和customer communication 的一个接口: session )<==IoProcessor(这个好象在code business 的时候没看到,后来才知道是用于处理IoFilter 的)<===IoFilter(*) ,IoHandler(business handler) 。

       原来,StreamIoHandler 的 里面wrap 了两个内部out,in,并且对IoHandler 的每个method 都做了实现,迷底马上揭晓了。。。see the blew:

 

    @Override
    public void sessionOpened(IoSession session) {
        // Set timeouts
        session.getConfig().setWriteTimeout(writeTimeout);
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout);

        // Create streams
        InputStream in = new IoSessionInputStream();
        OutputStream out = new IoSessionOutputStream(session);
        session.setAttribute(KEY_IN, in);
        session.setAttribute(KEY_OUT, out);
        processStreamIo(session, in, out);
    }

 

划克,把processStreamIo  template  method 了。。(PS:曾经偶在写portal 里用到过这种pattern)

然后其他的iohandler 的method 里面也全写满了io   logic....最后session closed 才把in ,out stream close .

我想,这样一来应该每传一个文件就回connect 一次了。。

 

费话说完了。。code ==>

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package minastudy.t2;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.demux.DemuxingIoHandler;
import org.apache.mina.handler.stream.StreamIoHandler;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
 *
 * @author zx04741
 */
public class Server {

    public static void main(String args[]) throws IOException {

        NioSocketAcceptor acceptor = new NioSocketAcceptor();
//        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
//        chain.addLast("codec", new ProtocolCodecFilter(
//                new TextLineCodecFactory()));

        StreamIoHandler handler = new StreamIoHandler() {

            private ExecutorService pool = Executors.newCachedThreadPool();

//            public void exceptionCaught(IoSession session, Throwable cause) {
//                super.exceptionCaught(session, cause);
//                System.out.println(cause);
//           }
//
//            public void sessionOpened(IoSession session) {
//                super.sessionOpened(session);
//                System.out.println(" session open....");
//            }


            @Override
            protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
                System.out.println("process stream...");
                pool.execute(new Work(session, in, out));
            }
        };
        acceptor.setHandler(handler);
        acceptor.bind(new InetSocketAddress(8889));
    }
}

class Work extends Thread {

    private InputStream in;
    private OutputStream out;
    private IoSession session;

    public Work(IoSession session, InputStream in, OutputStream out) {
        this.in = in;
        this.out = out;
        this.session = session;
    }

    public void run() {
        try {
            FileOutputStream fos = new FileOutputStream(new File("c:/jre(1).z01"));
            byte[] buf = new byte[2048];
            int offset = 0;
            while ((offset = in.read(buf)) != -1) {
                System.out.println(offset);
                fos.write(buf, 0, offset);
                fos.flush();
            }           
            fos.close();
            System.out.println("session over..");
        } catch (IOException ex) {
            Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
        }

    }
}


/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package minastudy.t2;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

/**
 *
 * @author zx04741
 */
public class Client {

    public static void main(String args[]) throws FileNotFoundException {

        NioSocketConnector connector = new NioSocketConnector();
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();
        connector.setHandler(new StreamIoHandler() {

//            private ExecutorService pool = Executors.newCachedThreadPool();
//            public void exceptionCaught(IoSession session, Throwable cause) {
//                super.exceptionCaught(session, cause);
//                System.out.println(cause);
//            }

//            public void messageSent(IoSession session, Object message) throws Exception {
//                // Empty handler
//                super.messageSent(session, message);
////                System.out.println("sent===>");
//            }
//
//            public void messageReceived(IoSession session, Object buf) {
//                super.messageReceived(session, buf);
////                System.out.println("recv===>");
//            }

            @Override
            protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
                //pool.execute(new Work(session, in, out));
                System.out.println("client in process stream..");
                try {
                    String fileName = "C:/jdk15910/jre(1).z01";
                    File f = new File(fileName);
                    byte[] buf = new byte[2048];
                    FileInputStream fis = new FileInputStream(f);
                    int offset = 0;
                    while (true) {
                        offset = fis.read(buf);
                        if (offset == -1) {
                            break;
                        }
                        System.out.println(offset);
                        out.write(buf, 0, offset);
                    }              
                    //important must be waiting for over..
                   session.close(false);                   
                    fis.close();
                    System.out.println("over..");
                }  catch (IOException ex) {
                    Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        });
        ConnectFuture future1 = connector.connect(new InetSocketAddress(8889));
        future1.awaitUninterruptibly();
        if (!future1.isConnected()) {
            System.out.println("no connect...");
        }
        IoSession session = future1.getSession();
        System.out.println("session==>start");

    }
}


 

 有很多忌讳的地方,client里面的out 不能flush ,否则回block住,。。。有点怪异,我看了下IoSessionOutputStream

    @Override
    public synchronized void flush() throws IOException {
        if (lastWriteFuture == null) {
            return;
        }

        lastWriteFuture.awaitUninterruptibly();
        if (!lastWriteFuture.isWritten()) {
            throw new IOException(
                    "The bytes could not be written to the session");
        }
    }

 

 重写的flush 方法。。synchonized...要等到上次写完成后才可以写下次。。但是也不可能block 住所有的啊。。难道有相关的锁没有释放??先到这,回家再研究。。

 

  problem traces 到 AbstractIoSession

 

  原来真的是的,在AbstractIoSession里面有一个WriteFuture 的object锁,会将client当前的线程读写锁定在当前buffer之中,OK,这样就真相大白了.现在要做的就是把client改成多线程来处理这件事情了和Server一样的,确实如此,另外,在写的过程要保持Thread.sleep,这样CPU就不会达到很高...

 

  code==>

  public class Client {

    public static void main(String args[]) throws FileNotFoundException {

        final NioSocketConnector connector = new NioSocketConnector();
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();
        connector.setHandler(new StreamIoHandler() {

            private ExecutorService pool = Executors.newCachedThreadPool();

            public void exceptionCaught(IoSession session, Throwable cause) {
                super.exceptionCaught(session, cause);
                System.out.println(cause);
            }

            public void sessionClosed(IoSession session) throws Exception {
                super.sessionClosed(session);
                System.out.println(" session close.. ");

            }

            public void sessionOpened(IoSession session) {
                super.sessionOpened(session);
                System.out.println(" session open....");
            }

            @Override
            protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
                pool.execute(new Worker2(session, in, out));

            }
        });
        ConnectFuture future1 = connector.connect(new InetSocketAddress(8889));
        future1.awaitUninterruptibly();
        if (!future1.isConnected()) {
            System.out.println("no connect...");
        } else {
            System.out.println("connected session==>start");
        }
//        IoSession session = future1.getSession();


    }
}

class Worker2 extends Thread {

    private InputStream in;
    private OutputStream out;
    private IoSession session;

    public Worker2(IoSession session, InputStream in, OutputStream out) {
        this.in = in;
        this.out = out;
        this.session = session;
    }

    public void run() {
        System.out.println("client in process stream..");
        try {
            //d:/drivers/sw8-Chipset.rar  F:\\IDE\\netbeans-6.7.1-ml-windows.exe
            String fileName = "F:\\IDE\\netbeans-6.7.1-ml-windows.exe";
            File f = new File(fileName);
            byte[] buf = new byte[2048];
            FileInputStream fis = new FileInputStream(f);
            int offset = 0;
            int count = 0;
            while (true) {
                Thread.sleep(4);
                offset = fis.read(buf);
                count++;
                if(count % 1000==0)
                System.out.println(offset+","+System.currentTimeMillis());
                if (offset == -1) {
                    break;
                }
                out.write(buf, 0, offset);
                out.flush();
            }
            fis.close();
            session.close(false).awaitUninterruptibly();
            System.out.println("over..");

        } catch (InterruptedException ex) {
            Logger.getLogger(Worker2.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}
 

 

 

 

分享到:
评论
1 楼 noddle0592 2011-04-21  
研究不错,感谢分享

相关推荐

    mina的高级使用,mina文件图片传送,mina发送文件,mina报文处理,mina发送xml和json

    Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...

    apache-mina-2.0.4.rar_apache mina_mina

    Apache Mina是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"apache-mina-2.0.4.rar"压缩包包含的是Apache Mina 2.0.4版本的源代码,是深入理解和定制Mina的...

    mina连接 mina心跳连接 mina断线重连

    Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...

    MINA_API+MINA_DOC+mina

    MINA (Java IO Network Application Framework) 是一个由Apache软件基金会开发的开源网络通信框架,主要应用于构建高性能、高可用性的网络服务器。这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和...

    Mina+Socket通信

    Mina和Socket是两种常见的网络通信框架和技术,它们在Java编程环境中被广泛使用。本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic ...

    mina2.0 含11个jar包

    mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...

    mina新手教程源码 mina+springboot+idea最简单的案例。

    mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...

    给予mina 协议进行大数据传输

    标题中的“给予mina协议进行大数据传输”指的是一种基于Java的网络通信框架——Apache MINA(Model-View-Controller for Network Applications)。MINA是Apache软件基金会的一个项目,它提供了一个高度可扩展和高...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...

    Mina开发实例(服务端、客户端)DEMO

    Apache Mina是一个高度可扩展的网络通信框架,它允许开发者创建高性能、高效率的服务端和客户端应用程序。在Java世界中,Mina以其简洁的API和灵活性而受到青睐,尤其适用于处理大量的并发连接,如TCP/IP和UDP协议。...

    mina自定义编解码器详解

    **mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...

    spring boot 整合mina 串口

    **Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...

    mina.zip内涵所有mina所需jar包

    Apache Mina是一个高度可扩展的Java网络通信框架,它提供了简单而强大的开发接口,用于创建高性能、高效率的网络应用程序。Mina的核心理念是将网络协议处理与业务逻辑分离,使得开发者可以专注于实现应用程序的业务...

    springboot 深度整合mina开箱即用

    在本文中,我们将深入探讨如何将Spring Boot与Mina进行深度整合,以便为新手开发者提供一个开箱即用的解决方案。Spring Boot以其简洁的配置和快速的开发体验,已经成为Java领域中的主流微服务框架,而Mina则是一个...

    Android-MinaSocket一款基于Mina的Socket长连接库

    **Android-MinaSocket:基于Mina的高效Socket长连接库** 在移动应用开发中,尤其是Android平台,实时性与稳定性是许多应用场景的核心需求,比如在线游戏、即时通讯、物联网设备等。在这种背景下,使用Socket进行长...

    mina demo mina jar包

    Apache Mina是一个开源项目,它提供了一个高度可扩展的网络通信框架,用于简化开发高性能、高可用性的网络服务器和客户端应用程序。"Mina demo mina jar包"指的是使用Apache Mina框架创建的一个演示示例,这个示例...

    mina客户端简单代码示例

    Apache Mina是一个开源的网络通信框架,主要用于简化Java应用程序与远程服务器之间的通信。它提供了高度可扩展和高性能的网络协议处理能力,支持多种传输层协议,如TCP/IP、UDP/IP和SSL/TLS等。在本示例中,我们关注...

    mina2.0全部jar包

    《mina2.0全部jar包详解》 Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,主要用于简化开发Java网络应用,特别是那些基于TCP和UDP协议的...

    java-mina通信框架详解.docx

    Apache Mina是一个强大的网络通信框架,专为基于TCP/IP和UDP/IP协议栈的应用设计。它提供了JAVA对象的序列化和虚拟机内部通信的功能,使得开发者能够迅速构建高性能、高可扩展性的网络应用。Mina的核心特性是其事件...

    mina心跳包机制

    mina心跳包机制是Apache Mina框架中的一个关键特性,它用于维持网络连接的活跃状态,确保数据能够在客户端和服务端之间顺畅地传输。Mina是一个高度可扩展的Java网络应用框架,广泛应用于各种分布式系统和网络服务,...

Global site tag (gtag.js) - Google Analytics