论坛首页 入门技术论坛

浅谈JavaNIO使用

浏览 3465 次
该帖已经被评为新手帖
作者 正文
   发表时间:2007-04-10  
OO

  代碼分爲客戶端、服務端以及調試代碼

客戶端:

package test.nio.tcp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

class AsyncClient
{
    private SocketChannel sc;
    private final int MAX_LENGTH = 1024;
    private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);
    private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);
    private static String host;
    private static int port = 6000;

    public AsyncClient()
    {
        try
        {
            InetSocketAddress addr = new InetSocketAddress(host, port);
            // 生成一个socketchannel
            sc = SocketChannel.open();

            // 连接到server
            sc.connect(addr);
            while (!sc.finishConnect());
            System.out.println("connection has been established!...");

            while (true)
            {
//                // 回射消息
//                String echo = null;
//                try
//                {
//                    System.err.println("Enter msg you'd like to send:  ");
//                    BufferedReader br = new BufferedReader(
//                        new InputStreamReader(System.in));
//                    // 输入回射消息
//                    echo = br.readLine();
//                    echo = "send [test] to server.";
//
//                    // 把回射消息放入w_buff中
//                    w_buff.clear();
//                    w_buff.put(echo.getBytes());
//                    w_buff.flip();
//                }
//                catch (IOException ioe)
//                {
//                    System.err.println("sth. is wrong with br.readline() ");
//                }
//
//                // 发送消息
//                while (w_buff.hasRemaining())
//                    sc.write(w_buff);
//                w_buff.clear();

                // 进入接收状态
                Rec();
                // 间隔1秒
                //Thread.currentThread().sleep(1000);
            }
        }
        catch (IOException ioe)
        {
            ioe.printStackTrace();
        }
//        catch (InterruptedException ie)
//        {
//            ie.printStackTrace();
//        }
    }

    // //////////
    // 读取server端发回的数据,并显示
    public void Rec() throws IOException
    {
        int count;
        r_buff.clear();
        count = sc.read(r_buff);
        r_buff.flip();
        byte[] temp = new byte[r_buff.limit()];
        r_buff.get(temp);
        System.out.println("reply is [" + count + "] long, and content is: "
            + new String(temp));
    }

    public static void main(String args[])
    {
//        if (args.length < 1)
//        {// 输入需有主机名或IP地址
//            try
//            {
//                System.err.println("Enter host name: ");
//                BufferedReader br = new BufferedReader(new InputStreamReader(
//                    System.in));
//                host = br.readLine();
//            }
//            catch (IOException ioe)
//            {
//                System.err.println("sth. is wrong with br.readline() ");
//            }
//        }
//        else if (args.length == 1)
//        {
//            host = args[0];
//        }
//        else if (args.length > 1)
//        {
//            host = args[0];
//            port = Integer.parseInt(args[1]);
//        }
        host = "127.0.0.1";
        port = 6000;
        new AsyncClient();
    }
}

服務端:

package test.nio.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

class AsyncServer implements Runnable
{
    private ByteBuffer r_buff = ByteBuffer.allocate(1024);
    private ByteBuffer w_buff = ByteBuffer.allocate(1024);
    private static int port = 7890;

    public AsyncServer()
    {
        new Thread(this).start();
    }

    public void run()
    {
        try
        {
            // 生成一个侦听端
            ServerSocketChannel ssc = ServerSocketChannel.open();
            // 将侦听端设为异步方式
            ssc.configureBlocking(false);
            // 生成一个信号监视器
            Selector s = Selector.open();
            // 侦听端绑定到一个端口
            ssc.socket().bind(new InetSocketAddress(port));
            // 设置侦听端所选的异步信号OP_ACCEPT
            ssc.register(s, SelectionKey.OP_ACCEPT);

            System.out.println("echo server has been set up ......");

            while (true)
            {
                int n = s.select();
                if (n == 0)
                {// 没有指定的I/O事件发生
                    continue;
                }
                Iterator it = s.selectedKeys().iterator();
                while (it.hasNext())
                {
                    SelectionKey key = (SelectionKey) it.next();
                    if (key.isAcceptable())
                    {// 侦听端信号触发
                        ServerSocketChannel server = (ServerSocketChannel) key
                            .channel();
                        // 接受一个新的连接
                        SocketChannel sc = server.accept();
                        sc.configureBlocking(false);
                        // 设置该socket的异步信号OP_READ:当socket可读时
                        // 触发函数DealwithData();
                        sc.register(s, SelectionKey.OP_READ);
                    }
                    if (key.isReadable())
                    {// 某socket可读信号
                        DealwithData(key);
                    }
                    it.remove();
                }
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    public void DealwithData(SelectionKey key) throws IOException
    {
        int count = 0;
        // 由key获取指定socketchannel的引用
        SocketChannel sc = (SocketChannel) key.channel();
        r_buff.clear();
        // 读取数据到r_buff
        while ((count = sc.read(r_buff)) > 0);
        // 确保r_buff可读
        r_buff.flip();

        w_buff.clear();
        // 将r_buff内容拷入w_buff
        w_buff.put(r_buff);
        w_buff.flip();
        // 将数据返回给客户端
        EchoToClient(sc);

        w_buff.clear();
        r_buff.clear();
    }

    public void EchoToClient(SocketChannel sc) throws IOException
    {
        while (w_buff.hasRemaining())
            sc.write(w_buff);
    }

    public static void main(String args[])
    {
        if (args.length > 0)
        {
            port = Integer.parseInt(args[0]);
        }
        new AsyncServer();
    }
}

本地調試:

package test.nio.tcp;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 *
 * @author Administrator
 * @version
 */
public class NBTest
{


    /** Creates new NBTest */
    public NBTest()
    {
    }

    public void startServer() throws Exception
    {
        int channels = 0;
        int nKeys = 0;
        //使用Selector
        Selector selector = Selector.open();
        //建立Channel 并绑定到9000端口
        ServerSocketChannel ssc = ServerSocketChannel.open();
        System.out.println("Bind Host["+InetAddress.getLocalHost().getHostName()+"] Port["+7890+"]");
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),7890);
        ssc.socket().bind(address);
        //使设定non-blocking的方式。
        ssc.configureBlocking(false);
        //向Selector注册Channel及我们有兴趣的事件
        SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
        printKeyInfo(s);
        while(true) // 不断的轮询
        {
            debug("NBTest: Starting select");
            //Selector通过select方法通知我们我们感兴趣的事件发生了。
            nKeys = selector.select();
            //如果有我们注册的事情发生了,它的传回值就会大于0
            if(nKeys > 0)
            {
                debug("NBTest: Number of keys after select operation: " +nKeys);
                //Selector传回一组SelectionKeys
                //我们从这些key中的channel()方法中取得我们刚刚注册的channel。
                Set selectedKeys = selector.selectedKeys();
                Iterator i = selectedKeys.iterator();
                while(i.hasNext())
                {
                    s = (SelectionKey) i.next();
                    printKeyInfo(s);
                    debug("NBTest: Nr Keys in selector: " +selector.keys().size());
                    //一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
                    i.remove();
                    if(s.isAcceptable())
                    {
                        System.out.println("Receive a new socket.");
                        //从channel()中取得我们刚刚注册的channel。
                        Socket socket = ((ServerSocketChannel)s.channel()).accept().socket();
                        SocketChannel sc = socket.getChannel();       
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
                        System.out.println(++channels);
                    }
                    else
                    {
                        debug("NBTest: Channel not acceptable");
                    }
                }
            }
            else
            {
                debug("NBTest: Select finished without any keys.");
            }
        }
    }
   
    private static void debug(String s)
    {
        System.out.println(s);
    }


    private static void printKeyInfo(SelectionKey sk)
    {
        String s = new String();
        s = "Att: " + (sk.attachment() == null ? "no" : "yes");
        s += ", Read: " + sk.isReadable();
        s += ", Acpt: " + sk.isAcceptable();
        s += ", Cnct: " + sk.isConnectable();
        s += ", Wrt: " + sk.isWritable();
        s += ", Valid: " + sk.isValid();
        s += ", Ops: " + sk.interestOps();
        debug(s);
    }

    /**
     * @param args the command line arguments
     */
    public static void main (String args[])
    {
        NBTest nbTest = new NBTest();
        try
        {
            nbTest.startServer();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }

}

 

 

论坛首页 入门技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics