`

Java网络编程 非阻塞I/O

    博客分类:
  • Java
阅读更多
对于CPU速度高于网络的情况,传统的Java解决方案是缓冲和多线程。多个线程可以同时为几个不同的连接生成数据,并将数据存储在缓冲其中,知道网络准备好发送。

一些基础概念
缓冲区Buffer
位置 position 缓冲区将被读取或写入的下一个位置(循环中的数组下标)
容量 capacity 缓冲区可以保存的元素最大数目(数组长度)
限度 limit 缓冲区中保存数据的最后一个位置,只要不改变限度,就无法读写超过这个位置的数据,即使缓冲区有更大的容量也没有用
标记 mark 缓冲区客户端专有的索引,make()的到当前position reset()返回

与读取InputSream不同,读取缓冲区实际上不会改变其中的数据,只会改变向前或向后改变位置,达到从缓冲区某个位置开始读取,
类似的,程序可以调整限度,从而控制可读取的数据的末尾。只有容量是不能变的。

Buffer超类中提供一下方法
*clean 清空  postion设为0,limit设置capacity
*filp 回绕 limit设为position
*rewind 回到 position设为0
*remaining  limit - position
*hasRemaining方法 limit - position > 0?

排空 get
填充 put
挤压 compact

    将缓冲区中所有剩余的数据一道缓冲区开头,为元素释放更多空间,缓冲区的位置设置为数据结尾。
    用于读取一个通道在写入另一个通道,用一个缓冲区就能完成随即交替
复制 duplicate
  返回值不是一个副本,复制的缓冲区和原来的缓冲区共享相同的数据,所以主要应当在只准备读取缓冲区时,才使用此方法。
  尽管他们共享相同的数据,但是初始和复制的缓冲区有独立的编辑、限度和位置。当希望多个通道并行地传输相同数据时,复制就非常有用。
分片 slicing
  复制的一个变形,分片也会创建和原缓冲区共享相同数据的新缓冲区,但是分片的初始位置是原缓冲区的当前位置。即分片是原缓冲区
  的子序列,只包含原缓冲区当前位置到限度的所有元素

Channels工具类,可以将传统的基于I/O的流,包装在通道中,也可以从通道转换基于I/O的流。
例如,所有当前的XML API都是用流、文件或其他传统I/O API来读取XML文档。如果编写用于处理SOAP请求的HTTP服务器,先使用通道读取HTTP请求主体,在将通道转换为流
SocketChannel channel = server.accept();
processHTTPHeader(channel);
XMLReader praser = XMLReaderFactory.createXMLReader();
parser.prase(Channels.newInputStream(channel));

就绪选择
能够选择读写时不阻塞的socket。将不同的通道注册到一个Selector对象。每个通道分配一个SelectionKey,然后程序可以询问Selector对象,那些通道已经准备就绪,可以无阻塞地完成目标操作
1 工厂方法Selector.open()创建新的选择器
2 实现SelectableChannel的类(FileChannel没有实现) register()方法。通过将选择器传递给通道的一个注册方法,向选择器注册通道
选择就绪的通道
selectNow()非阻塞 select() select(long timeout)阻塞



import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;
/**
 * 客户端连接时,服务器将发送连续的字符序列,直到客户端断开连接,
 * 客户端任何输入都被忽略
 */
public class ChargenServer {

  public static int DEFAULT_PORT = 19;

  public static void main(String[] args) {
    int port = DEFAULT_PORT;
    System.out.println("Listening for connections on port " + port);
    byte[] rotation = new byte[95 * 2];
    for (byte i = ' '; i <= '~'; i++) {
      rotation[i - ' '] = i;
      rotation[i + 95 - ' '] = i;
    }
    ServerSocketChannel serverChannel;
    Selector selector;
    try {
      serverChannel = ServerSocketChannel.open();
      // 获取ServerSocketChannel的对等端(prre)对象
      ServerSocket ss = serverChannel.socket();
      InetSocketAddress address = new InetSocketAddress(port);
      ss.bind(address);
      //非阻塞状态下,没有连接时,serverChannel.accept()会立即返回null
      serverChannel.configureBlocking(false);
      // 创建一个Selector,使程序能够对所有准备好的连接进行循环处理
      selector = Selector.open();
      // 在监视通道的选择器中进行注册
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }
    while (true) {
      try {
        // 调用选择器的select()方法,检查是否有就绪的通道,
        // 如果没有就绪通道,选择器就会等待(阻塞)
        selector.select();
      } catch (IOException ex) {
        ex.printStackTrace();
        break;
      }
      // 找到了就绪的通道,selectedKeys()方法返回就绪通道的SelectionKey
      // socket空闲时,即为可写,有数据来时,可读
      // Set集合好像链接到服务器的客户端集合
      Set readyKeys = selector.selectedKeys();
      Iterator iterator = readyKeys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = (SelectionKey) iterator.next();
        iterator.remove();
        try {
          // 就绪通道是ServerSocketChannel,程序会接受一个新的SocketChannel
          // 并将其添加到选择器 
          if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            // 注册了SelectionKey.OP_ACCEPT,当有accept发生时,Selector会监控到
            SocketChannel client = server.accept();
            System.out.println("Accepted connection from " + client);
            client.configureBlocking(false);
            // 每个SelectionKey都有一个Object类型的“附件”,
            // 在这里,可以将通道要写入网络的缓冲区存储到这个对象中
            SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
            ByteBuffer buffer = ByteBuffer.allocate(74);
            buffer.put(rotation, 0, 72);
            buffer.put((byte) '\r');
            buffer.put((byte) '\n');
            buffer.flip();
            key2.attach(buffer);
          // 就绪通道是SocketChannel,程序会向通道写数据
          } else if (key.isWritable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            // hasRemaining:如果有没写完的数据,就写入到通道
            // 否则,同rotation数据中的下一行数据重新填充缓冲区,再写入通道
            if (!buffer.hasRemaining()) {
              buffer.rewind();
              // 得到上一行的首字符
              int first = buffer.get();
              // 准备好改变缓冲区中的数据
              buffer.rewind();
              // 寻找rotation中新的首字符位置
              int position = first - ' ' + 1;
              // 将数据从rotation复制到缓冲区
              buffer.put(rotation, position, 72);
              buffer.put((byte) '\r');
              buffer.put((byte) '\n');
              // 准备缓冲区写入
              buffer.flip();
            }
            client.write(buffer);
          }
        } catch (IOException ex) {
          key.cancel();
          try {
            // 取消键后,仍可以得到键的通道
            key.channel().close();
          } catch (IOException cex) {
          }
        }
      }
    }
  }
}

import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.io.IOException;

public class ChargenClient {

  public static int DEFAULT_PORT = 19;

  public static void main(String[] args) {
    int port = DEFAULT_PORT;
    try {
      SocketAddress address = new InetSocketAddress("localhost", port);
      SocketChannel client = SocketChannel.open(address);
      ByteBuffer buffer = ByteBuffer.allocate(74);
      WritableByteChannel out = Channels.newChannel(System.out);
      // 通道会从Socket读取数据,填充到缓冲区,返回成功读取并存储在缓冲区的字节数
      // 设置为非阻塞时,当没有字节返回时会立即返回0,
      // 此时程序应写成:
//      client.configureBlocking(false);
//      while (true) {
//        int n = client.read(buffer);
//        if (n > 0) {
//          buffer.flip(); 
//          out.write(buffer);
//          buffer.clear();
//        } else if (n == -1)
//          break; // 服务器故障了
//      }
      while (client.read(buffer) != -1) {
        buffer.flip(); // 回绕 limit = position, position = 0
        out.write(buffer);
        buffer.clear();
      }
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}



import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;

/**
 * 服务器立即开始发送四字节的big-endian整数,从0开始,每次增加1,服务器最后总会绕回负数
 * 服务器无限运行。客户端在得到足够信息后关闭连接
 */
public class IntgenServer {

  public static int DEFAULT_PORT = 1919;

  public static void main(String[] args) {
    int port = DEFAULT_PORT;
    System.out.println("Listening for connections on port " + port);
    ServerSocketChannel serverChannel;
    Selector selector;
    try {
      serverChannel = ServerSocketChannel.open();
      ServerSocket ss = serverChannel.socket();
      InetSocketAddress address = new InetSocketAddress(port);
      ss.bind(address);
      serverChannel.configureBlocking(false);
      selector = Selector.open();
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }
    while (true) {
      try {
        selector.select();
      } catch (IOException ex) {
        ex.printStackTrace();
        break;
      }
      Set readyKeys = selector.selectedKeys();
      Iterator iterator = readyKeys.iterator();
      while (iterator.hasNext()) {
        iterator.next();iterator.next();
        SelectionKey key = (SelectionKey) iterator.next();
        iterator.remove();
        try {
          if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel client = server.accept();
            System.out.println("Accepted connection from " + client);
            client.configureBlocking(false);
            SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
            ByteBuffer output = ByteBuffer.allocate(4);
            output.putInt(0);
            output.flip();
            key2.attach(output);
          } else if (key.isWritable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer output = (ByteBuffer) key.attachment();
            if (!output.hasRemaining()) {
              output.rewind();
              int value = output.getInt();
              output.clear();
              output.putInt(value + 1);
              output.flip();
            }
            client.write(output);
          }
        } catch (IOException ex) {
          key.cancel();
          try {
            key.channel().close();
          } catch (IOException cex) {
          }
        }
      }
    }
  }
}


import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.io.IOException;

public class IntgenClient {

  public static int DEFAULT_PORT = 1919;

  public static void main(String[] args) {
    int port = DEFAULT_PORT;
    try {
      SocketAddress address = new InetSocketAddress("localhost", port);
      SocketChannel client = SocketChannel.open(address);
      ByteBuffer buffer = ByteBuffer.allocate(4);
      IntBuffer view = buffer.asIntBuffer();
      for (int expected = 0;; expected++) {
        client.read(buffer);
        int actual = view.get();
        buffer.clear();
        view.rewind();
        if (actual != expected) {
          System.err.println("Expected " + expected + "; was " + actual);
          break;
        }
        System.out.println(actual);
      }
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}



import java.io.IOException;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.Iterator;
import java.util.Set;

/**
 * 缓冲区的大小非常重要,大的缓冲区会隐藏bug,如果缓冲区足够大,可以保证
 * 所有测试用例不需要进行回绕或排空
 */
public class EchoServer {

  public static int DEFAULT_PORT = 7;

  public static void main(String[] args) {
    int port = DEFAULT_PORT;
    System.out.println("Listening for connections on port " + port);
    ServerSocketChannel serverChannel;
    Selector selector;
    try {
      serverChannel = ServerSocketChannel.open();
      ServerSocket ss = serverChannel.socket();
      InetSocketAddress address = new InetSocketAddress(port);
      ss.bind(address);
      serverChannel.configureBlocking(false);
      selector = Selector.open();
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }
    while (true) {
      try {
        selector.select();
      } catch (IOException ex) {
        ex.printStackTrace();
        break;
      }
      Set readyKeys = selector.selectedKeys();
      Iterator iterator = readyKeys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = (SelectionKey) iterator.next();
        iterator.remove();
        try {
          if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel client = server.accept();
            System.out.println("Accepted connection from " + client);
            client.configureBlocking(false);
            SelectionKey clientKey = client
                .register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
            ByteBuffer buffer = ByteBuffer.allocate(100);
            clientKey.attach(buffer);
          }
          if (key.isReadable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer output = (ByteBuffer) key.attachment();
            client.read(output);
          }
          if (key.isWritable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer output = (ByteBuffer) key.attachment();
            output.flip();
            client.write(output);
            // 将缓冲区中所有剩余的数据(没有写出的)移到缓冲区开头,缓冲器的位置设为数据结尾
            // 这样只利用一个缓冲区就能完成比较随机的交替读写。
            output.compact();
          }
        } catch (IOException ex) {
          key.cancel();
          try {
            key.channel().close();
          } catch (IOException cex) {
          }
        }
      }
    }
  }
}
import java.util.Set;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;

public class EchoClient {

  public static void main(String[] args) {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    Selector selector = null;
    SocketChannel sc = null;
    try {
      selector = Selector.open();
      sc = SocketChannel.open();
      sc.configureBlocking(false);
      sc.connect(new InetSocketAddress(InetAddress.getByName("localhost"), 7));
      print("客户端启动,准备连接...");
      if (sc.isConnectionPending()) {
        sc.finishConnect();
      }
      print("完成连接");
      sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
      boolean writed = false;
      boolean down = false;
      while (!down && selector.select() > 0) {
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        for (SelectionKey key : selectionKeys) {
          // int ops = key.readyOps();
          // if ((ops & SelectionKey.OP_WRITE) ==
          // SelectionKey.OP_WRITE && !writed) {
          if (key.isWritable() && !writed) {
            System.out.print("Input(bye to end): ");
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            String s = br.readLine();
            if (s != null && !s.trim().equals("")) {
              buffer.clear();
              buffer.put(s.getBytes());
              buffer.flip();
              sc.write(buffer);
              writed = true;
              if (s.equals("bye")) {
                down = true;
                break;
              }
            }
          }
          // if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ
          // && writed) {
          if (key.isReadable() && writed) {
            buffer.clear();
            sc.read(buffer);
            buffer.flip();
            byte[] b = new byte[buffer.limit()];
            buffer.get(b);
            print(new String(b));
            writed = false;
          }
        }
        selectionKeys.clear();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private static void print(String s) {
    System.out.println(s);
  }
}

分享到:
评论

相关推荐

    java阻塞i/o与非阻塞i/o控制

    在Java编程环境中,I/O(输入/输出)操作是程序与外部世界交互的关键部分,包括读取文件、网络通信等。对于高效的系统设计,理解并掌握阻塞I/O和非阻塞I/O是非常重要的。这两种I/O模型在处理数据传输时有着显著的...

    Java 新I/O

    5. **非阻塞I/O** 在传统的I/O模型中,读写操作通常是阻塞的,即在等待数据准备好或完成写入时,线程会被挂起。而在NIO中,通道和缓冲区支持非阻塞模式,当数据未准备好时,读写操作不会阻塞,而是立即返回,允许...

    java网络编程(非阻塞与阻塞编程)

    在深入探讨Java网络编程中的非阻塞与阻塞编程之前,我们先来了解这两个概念的基本含义。阻塞编程,通常指的是在程序执行过程中,当某一部分代码遇到I/O操作时,如读写文件或网络通信,整个程序会暂停运行,等待I/O...

    一站式学习Java网络编程 全面理解BIO:NIO:AIO1

    NIO(Non-Blocking I/O)是一种异步非阻塞式 I/O 模式,服务器使用单个线程来处理多个客户端请求,提高了服务器的性能和可扩展性。 AIO(Asynchronous I/O)是一种异步非阻塞式 I/O 模式,服务器使用回调函数来处理...

    Java I/O, 2nd Edition

    3. **非阻塞I/O(NIO)**:Java NIO在Java 1.4引入,提供了异步I/O操作,可以处理多个连接,而无需为每个连接创建一个新的线程。关键类有Selector、Channel和Buffer,Selector可以监控多个通道的事件,Channel代表I/...

    Java I/O, NIO and NIO.2

    非阻塞I/O(Non-blocking I/O),简称NIO,是Java 1.4引入的一个重要特性,主要由java.nio包提供。NIO的核心在于通道(Channels)和缓冲区(Buffers)。通道类似于流,但它们支持非阻塞读写,这意味着当数据不可用时...

    Java网络编程/Java网络编程实例

    4. **NIO(非阻塞I/O)**:Java的NIO库提供了选择器和通道的概念,使得程序可以同时处理多个连接,提高并发性能。Selector用于监听多个Channel,而Channel则连接到网络套接字或其他I/O源。 5. **HTTPClient**:Java...

    深入分析 Java I/O 的工作机制(转载)

    7. **NIO(非阻塞I/O)** Java 1.4引入了NIO(New I/O)框架,提供了一种更有效率的I/O模型,特别是在多路复用I/O(Selector)方面。NIO允许单线程处理多个通道,提高了服务器端并发性能。 8. **文件操作** Java...

    Java网络编程.pdf

    ### Java网络编程精要 #### Internet地址概述与分类 ...以上是对Java网络编程中一些核心知识点的概括,涵盖了从IP地址管理、套接字通信到非阻塞I/O模型的各个方面,为深入理解和应用Java网络编程技术奠定了基础。

    异步I/O处理

    非阻塞I/O意味着即使没有数据可读,读取操作也不会挂起;事件驱动则是通过事件回调机制,当某个I/O操作完成时,系统会通知相应的处理函数。这种模型非常适合网络服务、数据库连接等需要频繁进行I/O操作的场景。 在...

    java网络编程第四版pdf

    除了以上章节,书中还涵盖了套接字编程、服务器Socket、网络套接字API、URL和URLConnection类,以及高级主题如NIO(非阻塞I/O)和异步I/O。这些内容详细阐述了如何利用Java进行网络通信,包括建立连接、发送和接收...

    Java网络编程(第4版)PDF

    此外,Java NIO(New Input/Output)是另一个重要的主题,它是Java 1.4引入的新特性,提供了非阻塞I/O操作,能够显著提升性能。NIO基于通道(Channel)和选择器(Selector),使得程序可以同时处理多个连接,适用于...

    Java I/O系统

    此外,Java NIO(New Input/Output)是Java 1.4引入的一个重要改进,它提供了非阻塞I/O和选择器,能够同时处理多个输入输出通道,大大提升了并发性能。`Selector`类允许程序监控多个`SocketChannel`,一旦有数据可读...

    Java网络编程期末考试复习题库+答案

    在Java中,网络编程主要依赖于Java的Socket编程、ServerSocket、URL类以及NIO(非阻塞I/O)等核心API。这份"Java网络编程期末考试复习题库+答案"为学生提供了全面的复习资源,涵盖了Java网络编程的主要知识点。 1. ...

    非阻塞IO完成版.rar_阻塞IO_非阻塞_非阻塞io

    总的来说,这个项目展示了如何在Java中利用非阻塞I/O实现高效的网络通信,这对于理解并发编程和优化高负载系统有着重要的学习价值。通过分析和研究这个项目,开发者可以深入理解非阻塞I/O的工作原理,并将其应用于...

    java网络编程.zip

    通过学习这个压缩包内的所有内容,开发者不仅能掌握Java网络编程的基本原理,还能了解到高级主题,如非阻塞I/O和网络安全,从而具备构建高效、安全的网络应用的能力。这是一份全面而实用的学习资源,对于想要在Java...

    Java网络编程第三版.pdf

    此外,非阻塞I/O(NIO)的引入为高性能网络应用提供了可能,如Selector和Channel的概念。 3. **多线程与并发**:在网络编程中,多线程和并发处理是必不可少的,书中会讲解如何在Java中管理线程,以及如何处理并发...

    java网络编程精讲

    - Java NIO (New I/O) API提供了非阻塞I/O的支持,主要包括`Selector`、`Channel`和`Buffer`等核心组件。 3. **网络编程安全性**: - 在网络通信过程中,数据的安全性非常重要。 - 可以使用SSL/TLS协议来加密...

    java网络编程(第三版)oreilly

    Java NIO(非阻塞I/O)是另一个重要的话题。相对于传统的阻塞I/O模型,NIO提供了更高效的数据传输方式。书中会介绍Selector、Channel和Buffer等核心组件,以及如何构建多路复用的服务器,提高并发处理能力。 此外,...

    Java网络编程技术教案

    Java NIO(New I/O)提供了一种更高效的数据传输方式,通过选择器(Selector)和通道(Channel)实现非阻塞I/O,适合高并发、低延迟的网络应用。 七、Java网络编程实践 在“第十课 Java 网络编程”中,你将学习如何...

Global site tag (gtag.js) - Google Analytics