`
阅读更多


1. 传统Socket:阻塞式通信
在java传统socket技术中,每建立一个Socket连接时,须同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。
这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。下面的代码就说明了这一点。
a) server code:
package Socket;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
 
public class MultiUserServer extends Thread {
 
    private Socket client;
 
    public MultiUserServer(Socket c) {
        this.client = c;
    }
 
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
 
            // Mutil User but can’t parallel
            while (true) {
                String str = in.readLine();
 
                System.out.println("receive message: " + str);
                if (str.equals("end")) break;
            }
 
            client.close();
 
        } catch (IOException ex) {
 
        }
    }
 
    public static void main(String[] args) throws IOException {
        int port = 10086;
 
        if (args.length > 0)
            port = Integer.parseInt(args[0]);
        ServerSocket server = new ServerSocket(port);
        System.out.println("the server socket application is created!");
 
        while (true) {
            // transfer location change Single User or Multi User
            MultiUserServer mu = new MultiUserServer(server.accept());
            mu.start();
        }
    }
}
b) client code:
package Socket;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
 
public class Client {
    static Socket server;
 
    public static void main(String[] args) throws Exception {
        String host = "192.168.0. 10";
        int port = 10086;
 
        if (args.length > 1) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
 
        System.out.println("connetioning:" + host + ":" + port);
        server = new Socket(host, port);
        PrintWriter out = new PrintWriter(server.getOutputStream());
        BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));
 
        while (true) {
            String str = wt.readLine();
            out.println(str);
            out.flush();
 
            if (str.equals("end")) break;
        }
        server.close();
    }
}
2. nio socket: 非阻塞通讯模式
a) NIO 设计背后的基石:反应器模式
反应器模式: 用于事件多路分离和分派的体系结构模式。
反应器模式的核心功能如下:
将事件多路分用
将事件分派到各自相应的事件处理程序
b) NIO 的非阻塞 I/O 机制是围绕 选择器和 通道构建的。
选择器(Selector类): 是 Channel 的多路复用器。Selector 类将传入客户机请求多路分用并将它们分派到各自的请求处理程序。
通道(Channel 类):表示服务器和客户机之间的一种通信机制,一个通道负责处理一类请求/事件。
简单的来说:
NIO是一个基于事件的IO架构,最基本的思想就是:有事件我会通知你,你再去做与此事件相关的事情。而且NIO的主线程只有一个,不像传统的模型,需要多个线程以应对客户端请求,也减轻了JVM的工作量。
c) 当Channel注册至Selector以后,经典的调用方法如下:
while (somecondition) {
        int n = selector.select(TIMEOUT);
 
        if (n == 0) continue;
            for (Iterator iter = selector.selectedKeys().iterator(); iter.hasNext();) {
                if (key.isAcceptable()) doAcceptable(key);
                if (key.isConnectable()) doConnectable(key);
                if (key.isValid() && key.isReadable()) doReadable(key);
                if (key.isValid() && key.isWritable()) doWritable(key);
 
                iter.remove();
            }
        }
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。
Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。
d) Sample01
package NIO;
// 程序目的:学习Java NIO#SocketChannel

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
 
public class SocketChannelDemo {
    public static int PORT_NUMBER = 23;// 监听端口
    static String line = "";
    ServerSocketChannel serverChannel;
    ServerSocket serverSocket;
    Selector selector;
 
    private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
 
    public static void main(String[] args) throws Exception {
        SocketChannelDemo server = new SocketChannelDemo();
        server.init(args);
        server.startWork();
    }
 
    public void init(String[] argv) throws Exception {
        int port = PORT_NUMBER;
 
        if (argv.length > 0) port = Integer.parseInt(argv[0]);
 
        System.out.println("Listening on port " + port);
 
        // 分配一个ServerSocketChannel
        serverChannel = ServerSocketChannel.open();
 
        // 从ServerSocketChannel里获得一个对应的Socket
        serverSocket = serverChannel.socket();
 
        // 生成一个Selector
        selector = Selector.open();
 
        // 把Socket绑定到端口上
        serverSocket.bind(new InetSocketAddress(port));
 
        // serverChannel为非bolck
        serverChannel.configureBlocking(false);
 
        // 通过Selector注册ServerSocetChannel
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
   }
 
   public void startWork() throws Exception {
       while (true) {
           int n = selector.select();// 获得IO准备就绪的channel数量
 
           if (n == 0) continue; // 没有channel准备就绪,继续执行
 
           // 用一个iterator返回Selector的selectedkeys
           Iterator it = selector.selectedKeys().iterator();
 
           // 处理每一个SelectionKey
           while (it.hasNext()) {
               SelectionKey key = (SelectionKey) it.next();
 
               // 判断是否有新的连接到达
               if (key.isAcceptable()) {
                   // 返回SelectionKey的ServerSocketChannel
                   ServerSocketChannel server = (ServerSocketChannel) key.channel();
                   SocketChannel channel = server.accept();
                   registerChannel(selector, channel, SelectionKey.OP_READ);
                   doWork(channel);
               }
 
               // 判断是否有数据在此channel里需要读取
               if (key.isReadable()) processData(key);
 
               // 删除 selectedkeys
               it.remove();
           }
       }
   }
 
   protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception {
       if (channel == null) return;
 
       channel.configureBlocking(false);
       channel.register(selector, ops);
   }
 
   // 处理接收的数据
   protected void processData(SelectionKey key) throws Exception {
       SocketChannel socketChannel = (SocketChannel) key.channel();
       int count;
 
       buffer.clear(); // 清空buffer
 
       // 读取所有的数据
       while ((count = socketChannel.read(buffer)) > 0) {
           buffer.flip();
 
           // send the data, don′t assume it goes all at once
           while (buffer.hasRemaining()) {
               char c = (char) buffer.get();
               line += c;
 
               // 如果收到回车键,则在返回的字符前增加[echo]$字样,并且server端打印出字符串
               if (c == (char) 13) {
                   buffer.clear();
                   buffer.put("[echo]$".getBytes());
                   buffer.flip();
                   System.out.println(line); //
                   line = "";
               }
 
               socketChannel.write(buffer);// 在Socket里写数据
           }
 
           buffer.clear(); // 清空buffer
       }
       if (count < 0) socketChannel.close(); // count<0,说明已经读取完毕
   }
 
   private void doWork(SocketChannel channel) throws Exception {
       buffer.clear();
       buffer.put("Hello,I am working,please input some thing,and i will echo to you![echo]$".getBytes());
       buffer.flip();
       channel.write(buffer);
   }
}
运行此程序,然后在控制台输入命令telnet localhost 23。
e) Server code:
public class NonBlockingServer {
    public Selector sel = null;
    public ServerSocketChannel server = null;
    public SocketChannel socket = null;
    public int port = 4900;
    String result = null;
    public NonBlockingServer() {
        System.out.println("Inside default ctor");
    }
 
    public NonBlockingServer(int port) {
        System.out.println("Inside the other ctor");
        this.port = port;
    }
 
    public void initializeOperations() throws IOException,UnknownHostException {
        System.out.println("Inside initialization");
        sel = Selector.open();
        server = ServerSocketChannel.open();
        server.configureBlocking(false);
        InetAddress ia = InetAddress.getLocalHost();
        InetSocketAddress isa = new InetSocketAddress(ia,port);
        server.socket().bind(isa);
    }
 
    public void startServer() throws IOException {
        System.out.println("Inside startserver");
        initializeOperations();
 
        System.out.println("Abt to block on select()");
        SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );
        while (acceptKey.selector().select() > 0 ) {
            Set readyKeys = sel.selectedKeys();
            Iterator it = readyKeys.iterator();
 
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey)it.next();
                it.remove();
                if (key.isAcceptable()) {
                    System.out.println("Key is Acceptable");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    socket = (SocketChannel) ssc.accept();
                    socket.configureBlocking(false);
                    SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                if (key.isReadable()) {
                    System.out.println("Key is readable");
                    String ret = readMessage(key);
                    if (ret.length() > 0) writeMessage(socket,ret);
                }
                if (key.isWritable()) {
                    System.out.println("THe key is writable");
                    String ret = readMessage(key);
                    socket = (SocketChannel)key.channel();
 
                    if (result.length() > 0 ) writeMessage(socket,ret);
                }
            }
        }
    }
 
    public void writeMessage(SocketChannel socket,String ret) {
        System.out.println("Inside the loop");
        if (ret.equals("quit") || ret.equals("shutdown")) return;
 
        try {
            String s = "This is content from server!—————————————–";
            Charset set = Charset.forName("us-ascii");
            CharsetDecoder dec = set.newDecoder();
            CharBuffer charBuf = dec.decode(ByteBuffer.wrap(s.getBytes()));
 
            System.out.println(charBuf.toString());
 
            int nBytes = socket.write(ByteBuffer.wrap((charBuf.toString()).getBytes()));
            System.out.println("nBytes = "+nBytes);
            result = null;
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
 
    public String readMessage(SelectionKey key) {
        int nBytes = 0;
        socket = (SocketChannel)key.channel();
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            nBytes = socket.read(buf);
            buf.flip();
            Charset charset = Charset.forName("us-ascii");
            CharsetDecoder decoder = charset.newDecoder();
            CharBuffer charBuffer = decoder.decode(buf);
            result = charBuffer.toString();
        } catch(IOException e) {
            e.printStackTrace();
        }
        return result;
    }
 
    public static void main(String args[]) {
        NonBlockingServer nb;
        if (args.length < 1) nb = new NonBlockingServer()
        else {
            int port = Integer.parseInt(args[0]);
            nb = new NonBlockingServer(port);
        }
 
        try {
            nb.startServer();
            System.out.println("the nonBlocking server is started!");
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
}
2.2.4.2 Client code:
public class Client {
    public SocketChannel client = null;
    public InetSocketAddress isa = null;
    public RecvThread rt = null;
    private String host;
    private int port;
 
    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }
 
    public void makeConnection() {
        String proxyHost = "192.168.254.212";
        String proxyPort = "1080";
 
        System.getProperties().put("socksProxySet", "true");
        System.getProperties().put("socksProxyHost", proxyHost);
        System.getProperties().put("socksProxyPort", proxyPort);
 
        int result = 0;
        try {
            client = SocketChannel.open();
            isa = new InetSocketAddress(host, port);
            client.connect(isa);
            client.configureBlocking(false);
            receiveMessage();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        long begin = System.currentTimeMillis();
        sendMessage();
        long end = System.currentTimeMillis();
        long userTime = end - begin;
        System.out.println("use tiem: " + userTime);
 
        try {
            interruptThread();
            client.close();
            System.exit(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public int sendMessage() {
        System.out.println("Inside SendMessage");
        String msg = null;
        ByteBuffer bytebuf;
        int nBytes = 0;
 
        try {
            msg = "It’s message from client!";
            System.out.println("msg is "+msg);
            bytebuf = ByteBuffer.wrap(msg.getBytes());
 
            for (int i = 0; i < 1000; i++) {
                nBytes = client.write(bytebuf);
                System.out.println(i + " finished");
            }
 
            interruptThread();
 
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
 
            client.close();
            return -1;
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        return nBytes;
    }
 
    public void receiveMessage() {
        rt = new RecvThread("Receive THread", client);
        rt.start();
    }
 
    public void interruptThread() {
        rt.val = false;
    }
 
    public static void main(String args[]) {
        if (args.length < 2) {
            System.err.println("You should put 2 args: host,port");
        } else {
            String host = args[0];
            int port = Integer.parseInt(args[1]);
 
            Client cl = new Client(host, port);
            cl.makeConnection();
        }
 
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        String msg;
    }
 
    public class RecvThread extends Thread {
        public SocketChannel sc = null;
        public boolean val = true;
 
        public RecvThread(String str, SocketChannel client) {
            super(str);
            sc = client;
        }
 
        public void run() {
            int nBytes = 0;
            ByteBuffer buf = ByteBuffer.allocate(2048);
 
            try {
                while (val) {
                    while ((nBytes = nBytes = client.read(buf)) > 0) {
                        buf.flip();
                        Charset charset = Charset.forName("us-ascii");
                        CharsetDecoder decoder = charset.newDecoder();
                        CharBuffer charBuffer = decoder.decode(buf);
                        String result = charBuffer.toString();
                        System.out.println("the server return: " + result);
                        buf.flip();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
Reactor模式和NIO
当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
Read request
Decode request
Process service
Encode reply
Send reply
经典的网络服务的设计如下图,在每个线程中完成对数据的处理:
但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。
Reactor模式类似于AWT中的Event处理:
Reactor模式参与者
1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener
如图:
Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。
我们来看看Reactor模式代码:
public class Reactor implements Runnable{
 
  final Selector selector;
  final ServerSocketChannel serverSocket;
 
  Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
    serverSocket.socket().bind(address);
 
    serverSocket.configureBlocking(false);
    //向selector注册该channel
     SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);
 
    logger.debug("–>Start serverSocket.register!");
 
    //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
    sk.attach(new Acceptor());
    logger.debug("–>attach(new Acceptor()!");
  }
 
  public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted())
    {
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
      while (it.hasNext())
        //来一个事件第一次触发一个accepter线程
        //以后触发SocketReadHandler
        dispatch((SelectionKey)(it.next()));
        selected.clear();
      }
    }catch (IOException ex) {
        logger.debug("reactor stop!"+ex);
    }
  }
 
  //运行Acceptor或SocketReadHandler
  void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null){
      // r.run();
 
    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
    try {
      logger.debug("–>ready for accept!");
      SocketChannel c = serverSocket.accept();
      if (c != null)
        //调用Handler来处理channel
        new SocketReadHandler(selector, c);
      }
    catch(IOException ex) {
      logger.debug("accept stop!"+ex);
    }
    }
  }
}
以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。
再看看Handler代码:
public class SocketReadHandler implements Runnable {
 
  public static Logger logger = Logger.getLogger(SocketReadHandler.class);
 
  private Test test=new Test();
 
  final SocketChannel socket;
  final SelectionKey sk;
 
   static final int READING = 0, SENDING = 1;
  int state = READING;
 
  public SocketReadHandler(Selector sel, SocketChannel c)
    throws IOException {
 
    socket = c;
 
    socket.configureBlocking(false);
     sk = socket.register(sel, 0);
    //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
    //参看dispatch(SelectionKey k)
    sk.attach(this);
 
    //同时将SelectionKey标记为可读,以便读取。
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
  }
 
  public void run() {
    try{
    // test.read(socket,input);
      readRequest() ;
    }catch(Exception ex){
    logger.debug("readRequest error"+ex);
    }
  }
private void readRequest() throws Exception {
 
  ByteBuffer input = ByteBuffer.allocate(1024);
  input.clear();
  try {
    int bytesRead = socket.read(input);
    ……
    //激活线程池处理这些request
    requestHandle(new Request(socket,btt));
    …..
  }catch(Exception e) {
  }
}
注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。
将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:
更进一步,我们可以使用多个Selector分别处理连接和读事件。
一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。
3. Socket网络框架 MINA
a) Overview
MINA是一个网络应用框架,在不牺牲性能和可扩展性的前提下用于解决如下问题:
快速开发自己的应用。
高可维护性,高可复用性:网络I/O编码,消息的编/解码,业务逻辑互相分离。
相对容易的进行单元测试。
b) MINA架构:
IoSessionManager: Where real I/O occurs
IoFilters: Filters I/O events • requests
IoHandler: Your protocol logic
IoSession: Represents a connection

IoFilters:
IoFilter为MINA的功能扩展提供了接口。它拦截所有的IO事件进行事件的预处理和河畜处理(AOP)。我们可以把它想象成Servlet的filters。
IoFilter能够实现以下几种目的:
事件日志
性能检测
数据转换(e.g. SSL support),codec
防火墙…等等
codec: ProtocolCodecFactory
MINA提供了方便的Protocol支持。如上说讲,codec在IoFilters中设置。
通过它的Encoder和Decoder,可以方便的扩展并支持各种基于Socket的网络协议,比如HTTP服务器、FTP服务器、Telnet服务器等等。
要实现自己的编码/解码器(codec)只需要实现interface: ProtocolCodecFactory即可.
在MINA 1.0版本,MINA已经实现了几个常用的(codec factory):
DemuxingProtocolCodecFactory,
NettyCodecFactory,
ObjectSerializationCodecFactory,
TextLineCodecFactory
其中:
TextLineCodecFactory:
A ProtocolCodecFactory that performs encoding and decoding between a text line data and a Java
string object. This codec is useful especially when you work with a text-based protocols such as SMTP and IMAP.
ObjectSerializationCodecFactory:
A ProtocolCodecFactory that serializes and deserializes Java objects. This codec is very useful when
you have to prototype your application rapidly without any specific codec.
DemuxingProtocolCodecFactory:
A composite ProtocolCodecFactory that consists of multiple MessageEncoders and MessageDecoders. ProtocolEncoder and ProtocolDecoder this factory returns demultiplex incoming messages and buffers to appropriate MessageEncoders and MessageDecoders.
NettyCodecFactory:
A MINA ProtocolCodecFactory that provides encoder and decoder for Netty2 Messages and MessageRecognizers.
IoHandler :business logic
MINA中,所有的业务逻辑都在实现了IoHandler的class完成。
Interface Handle:
all protocol events fired by MINA. There are 6 event handler methods, and they are all invoked by MINA automatically.
当事件发生时,将触发IoHandler中的方法:
sessionCreated:当一个session创建的时候调用;
sessionOpened:在sessionCreated调用之后被调用;
sessionClosed:当IO连接被关闭时被调用;
sessionIdle:当在远程实体和用户程序之间没有数据传输的时候被调用;
exceptionCaught:当IoAcceptor 或者IoHandler.中出现异常时被调用;
messageReceived:当接受到消息时调用;
messageSent:当发出请求时调用。
MINA 1.0中,IoHandler的实现类:
ChainedIoHandler
DemuxingIoHandler,
IoHandlerAdapter
SingleSessionIoHandlerDelegate
StreamIoHandler
具体细节可参考javadoc。
c) MINA的高级主题:线程模式
MINA通过它灵活的filter机制来提供多种线程模型。
没有线程池过滤器被使用时MINA运行在一个单线程模式。
如果添加了一个IoThreadPoolFilter到IoAcceptor,将得到一个leader-follower模式的线程池。
如果再添加一个ProtocolThreadPoolFilter,server将有两个线程池:
一个(IoThreadPoolFilter)被用于对message对象进行转换,另外一个(ProtocolThreadPoolFilter)被用于处理业务逻辑。
SimpleServiceRegistry加上IoThreadPoolFilter和ProtocolThreadPoolFilter的缺省实现即可适用于需要高伸缩性的应用。如果想使用自己的线程模型,请参考SimpleServiceRegistry的源代码,并且自己初始化Acceptor。
IoThreadPoolFilter threadPool = new IoThreadPoolFilter();threadPool.start();
IoAcceptor acceptor = new SocketAcceptor();
acceptor.getFilterChain().addLast( "threadPool", threadPool);
ProtocolThreadPoolFilter threadPool2 = new ProtocolThreadPoolFilter();
threadPool2.start();
ProtocolAcceptor acceptor2 = new IoProtocolAcceptor( acceptor );
acceptor2.getFilterChain().addLast( "threadPool", threadPool2 );

threadPool2.stop();
threadPool.stop();
d) 采用MINA进行socket开发,一般步骤如下:
Begin:
IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器
or client:
SocketConnector connector = new SocketConnector(); //建立一个连接器
server的属性配置:
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setReuseAddress(true);
cfg.getFilterChain().addLast( "codec",
                     new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //对象序列化 codec factory
cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
绑定address和business logic
server:
acceptor.bind(new InetSocketAddress( SERVER_PORT ),
              new ServerSessionHandler( ), cfg ); // 绑定address和handler
client:
connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                  new ClientSessionHandler(msg), cfg );
实现自己的业务逻辑: IoHandler
如有必要,实现自己的CODEC
下面的代码演示了采用ObjectSerializationCodecFactory给服务端传送文件:
e) Client
public class Client {
    private static final String HOSTNAME = "192.168.0.81";
    private static final int PORT = 8080;
    private static final int CONNECT_TIMEOUT = 30; // seconds
 
    public static void main( String[] args ) throws Throwable {
        System.out.println("in nio client");
        SocketConnector connector = new SocketConnector();
        // Configure the service.
        SocketConnectorConfig cfg = new SocketConnectorConfig();
        cfg.setConnectTimeout( CONNECT_TIMEOUT );
        cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
        IoSession session;
        if(args.length > 1) {
           connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                    new ClientSessionHandler(args), cfg );
        } else {
           String[] files = {"E:/music/lcl/juhuatai.mp3",
                             "E:/music/lcl/jimosazhouleng.mp3"};
           connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                    new ClientSessionHandler(files), cfg );
        }
    }
}
f) Clint handle(client端的业务代码)
public class ClientSessionHandler extends IoHandlerAdapter {
    private String[] files;
 
    public ClientSessionHandler(String[] files) {
        this.files = files;
    }
 
    public void sessionOpened( IoSession session ) {
        for (int i = 0; i < this.files.length; i++) {
            Thread sendMessageThread = new SendMessageThread("Thread" + i, session,files[i]);
            sendMessageThread.start();
        }
    }
 
    public void messageReceived( IoSession session, Object message ) {
        System.out.println("in messageReceived!");
    }
 
    public void exceptionCaught( IoSession session, Throwable cause ) {
        session.close();
    }
 
    public class SendMessageThread extends Thread {
        private IoSession session;
        private String filename;
 
        public SendMessageThread(String name, IoSession session, String filename) {
            super(name);
            this.session = session;
            this.filename = filename;
        }
 
        public void run() {
            System.out.println("start thread: " + this.getName());
            try {
                ByteBuffer buf = ByteBuffer.allocate(Constants.BUF_SIZE);
 
                FileChannel fc = new FileInputStream(filename).getChannel();
 
                int index;
                while ((index = NioFileUtil.readFile(fc, buf)) > 0) {
                  buf.flip();
                  byte[] bs;
 
                  if (index == buf.capacity()) {
                      bs = buf.array();
                  } else {
                      bs = new byte[index];
 
                      int i = 0;
                      while (buf.hasRemaining()) {
                          bs[i++] = buf.get();
                      }
                  }
 
                  Message msg = new Message(filename,Constants.CMD_SEND, bs);
                  session.write(msg);
                }
 
                Message msg = new Message(filename, Constants.CMD_FINISHED, null);
                session.write(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
g) Server:
public class Server {
    private static final int SERVER_PORT = 8080;
 
    public static void main( String[] args ) throws Throwable {
        IoAcceptor acceptor = new SocketAcceptor();
        // Prepare the service configuration.
        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.setReuseAddress( true );
        cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.bind(
                new InetSocketAddress( SERVER_PORT ),
                new ServerSessionHandler( ), cfg );
        System.out.println( "nioFileServer Listening on port " + SERVER_PORT );
    }
}
h) Server handle:(Server端业务代码)
public class ServerSessionHandler extends IoHandlerAdapter {
    public void sessionOpened( IoSession session ) {
        // set idle time to 60 seconds
        System.out.println("in sessionOpened");
        session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );
        session.setAttribute("times",new Integer(0));
    }
 
    public void messageReceived( IoSession session, Object message ) {
        System.out.println("in messageReceived");
        Message msg = (Message) message;
        System.out.println("the file name is: " + msg.getFileName() + ""n");
       this.process(session, msg);
   }
 
   private void process(IoSession session, Message message) {
       String[] temparray = message.getFileName().split("[//]");
        String filename ="d:/" + temparray[temparray.length - 1];
        if (session.containsAttribute(message.getFileName())) {
            FileChannel channel = (FileChannel)session.getAttribute(message.getFileName());
            if (message.getType().equals(Constants.CMD_SEND)) {
                try {
                    NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    channel.close();
                    channel = null;
                    session.removeAttribute(message.getFileName());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } else {
            try {
                FileChannel channel = new FileOutputStream(filename).getChannel();
                NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));
                session.setAttribute(message.getFileName(), channel);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
 
    public void sessionIdle( IoSession session, IdleStatus status ) {
        SessionLog.info( session, "Disconnecting the idle." );
        // disconnect an idle client
        session.close();
    }
 
    public void exceptionCaught( IoSession session, Throwable cause ) {
        // close the connection on exceptional situation
        session.close();
    }
}
i) 文件操作:
public class NioFileUtil {
 
    public static void writeFile(FileChannel fileChannel, ByteBuffer buf) throws Exception {
        buf.clear();
        fileChannel.write(buf);
    }
 
    public static int readFile(FileChannel fileChannel,ByteBuffer buf) throws IOException {
        buf.rewind();
        int index = fileChannel.read(buf);
        return index;
    }
}
j) 常量:

public class Constants {
    public static final String CMD_FINISHED = "FINISHED";
    public static final String CMD_SEND = "SEND";
    public static final int BUF_SIZE = 10240;
    private Constants(){}
}
Demo
Introduction
org.apache.mina.example.chat
Chat server which demonstates using the text line codec and Spring integration.
org.apache.mina.example.chat.client
Swing based chat client.
org.apache.mina.example.echoserver
Echo server which demonstate

分享到:
评论
2 楼 nocb 2008-08-04  
好高深啊,学习中
你好,我想用mina写个客户端,从数据的队列中取数据,发送给服务端,如何写啊?
我不想每条数据建一个连接 。谢谢
hansen.nocb@gmail.com
1 楼 nocb 2008-08-04  
好高深啊,学习中
你好,我想用mina写个客户端,从数据的队列中取数据,发送给客户端,如何写啊?
我不想每条数据建一个连接 。谢谢
hansen.nocb@gmail.com

相关推荐

    mina的高级使用,mina文件图片传送,mina发送文件,mina报文处理,mina发送xml和json

    Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...

    apache-mina-2.0.4.rar_apache mina_mina

    Apache Mina是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"apache-mina-2.0.4.rar"压缩包包含的是Apache Mina 2.0.4版本的源代码,是深入理解和定制Mina的...

    MINA_API+MINA_DOC+mina

    MINA (Java IO Network Application Framework) 是一个由Apache软件基金会开发的开源网络通信框架,主要应用于构建高性能、高可用性的网络服务器。这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和...

    mina连接 mina心跳连接 mina断线重连

    Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...

    Mina+Socket通信

    Mina和Socket是两种常见的网络通信框架和技术,它们在Java编程环境中被广泛使用。本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic ...

    mina2.0 含11个jar包

    mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...

    mina新手教程源码 mina+springboot+idea最简单的案例。

    mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...

    给予mina 协议进行大数据传输

    标题中的“给予mina协议进行大数据传输”指的是一种基于Java的网络通信框架——Apache MINA(Model-View-Controller for Network Applications)。MINA是Apache软件基金会的一个项目,它提供了一个高度可扩展和高...

    mina.zip内涵所有mina所需jar包

    Apache Mina是一个高度可扩展的Java网络通信框架,它提供了简单而强大的开发接口,用于创建高性能、高效率的网络应用程序。Mina的核心理念是将网络协议处理与业务逻辑分离,使得开发者可以专注于实现应用程序的业务...

    mina demo mina jar包

    Apache Mina是一个开源项目,它提供了一个高度可扩展的网络通信框架,用于简化开发高性能、高可用性的网络服务器和客户端应用程序。"Mina demo mina jar包"指的是使用Apache Mina框架创建的一个演示示例,这个示例...

    Mina开发实例(服务端、客户端)DEMO

    Apache Mina是一个高度可扩展的网络通信框架,它允许开发者创建高性能、高效率的服务端和客户端应用程序。在Java世界中,Mina以其简洁的API和灵活性而受到青睐,尤其适用于处理大量的并发连接,如TCP/IP和UDP协议。...

    mina自定义编解码器详解

    **mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...

    spring boot 整合mina 串口

    **Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...

    springboot 深度整合mina开箱即用

    在本文中,我们将深入探讨如何将Spring Boot与Mina进行深度整合,以便为新手开发者提供一个开箱即用的解决方案。Spring Boot以其简洁的配置和快速的开发体验,已经成为Java领域中的主流微服务框架,而Mina则是一个...

    mina客户端简单代码示例

    Apache Mina是一个开源的网络通信框架,主要用于简化Java应用程序与远程服务器之间的通信。它提供了高度可扩展和高性能的网络协议处理能力,支持多种传输层协议,如TCP/IP、UDP/IP和SSL/TLS等。在本示例中,我们关注...

    mina心跳包机制

    mina心跳包机制是Apache Mina框架中的一个关键特性,它用于维持网络连接的活跃状态,确保数据能够在客户端和服务端之间顺畅地传输。Mina是一个高度可扩展的Java网络应用框架,广泛应用于各种分布式系统和网络服务,...

    mina

    标题 "MINA" 指的是 Apache MINA (Multipurpose Infrastructure for Network Applications),这是一个由Apache软件基金会开发的Java框架,主要用于构建高性能、高可用性的网络应用程序。MINA提供了一个高级的网络...

    mina2.0全部jar包

    《mina2.0全部jar包详解》 Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,主要用于简化开发Java网络应用,特别是那些基于TCP和UDP协议的...

    java-mina通信框架详解.docx

    Apache Mina是一个强大的网络通信框架,专为基于TCP/IP和UDP/IP协议栈的应用设计。它提供了JAVA对象的序列化和虚拟机内部通信的功能,使得开发者能够迅速构建高性能、高可扩展性的网络应用。Mina的核心特性是其事件...

Global site tag (gtag.js) - Google Analytics