`
jarbee
  • 浏览: 28223 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

[转]NIO Server程序片断

    博客分类:
  • java
阅读更多
Introduction

This tutorial is intended to collect together my own experiences using the Java NIO libraries and the dozens of hints, tips, suggestions and caveats that litter the Internet. When I wrote Rox all of the useful information existed as just that: hints, tips, suggestions and caveats on a handful of forums. This tutorial actually only covers using NIO for asynchronous networking (non-blocking sockets), and not the NIO libraries in all their glory. When I use the term NIO in this tutorial I'm taking liberties and only talking about the non-blocking IO part of the API.

If you've spent any time looking at the JavaDoc documentation for the NIO libraries then you know they're not the most transparent docs floating around. And if you've spent any time trying to write code based on the NIO libraries and you use more than one platform you've probably run into something that works on one platform but locks up on another. This is particularly true if you started on Windows and moved over to Linux (using Sun's implementation).

This tutorial takes you from nothing to a working client-server implementation, and will hopefully help you avoid all of the pitfalls waiting to trap the unwary. I've turned it around and start with a server implementation, since that's the most common use-case for the NIO libraries.

Comments, criticisms, suggestions and, most important, corrections are welcome. The examples here have been developed and tested on Sun's NIO implementation using version 1.4.2 and 1.5.0 of Sun's Hotspot JVM on Windows and Linux. Your mileage may vary but if you do run into funnies please let me know and I'll incorporate them into this page.

Drop me a line at nio@flat502.com.
Credits

Credit where credit is due. This tutorial pulls together a lot of ideas, none of which I can claim original ownership. Sources vary widely and I haven't managed to keep track of all of them, but an incomplete list of some of the larger contributors would include:

    The Java Developers Almanac
    Rob Grzywinski's thoroughly entertaining rant about getting SSL and the NIO libraries to play together on Java 1.4
    This post in the Java Forums regarding SSL and NIO on 1.4
    The "Taming the NIO Circus" thread in the Java Forums
    A discussion in the Java Forums on multithreaded access to NIO
    Half a dozen other posts in various forums and on various blogs

Source source code in this tutorial was generated from Java source using Java2Html.
General principles

A few general principles inform the approach I've taken. There may be other approaches (I've certainly seen a few other suggestions) but I know this one works, and I know it performs. Sometimes it's worth spending time finding the best possible approach. Sometimes it's enough to find an approach that works.

These general ideas apply on both the client and server side. In fact, given that the only difference between the client and the server is whether or not a connection is initiated or accepted, the bulk of the logic for using NIO for a comms implementation can be factored out and shared.

So, without further ado:
Use a single selecting thread

Although NIO selectors are threadsafe their key sets are not. The upshot of this is that if you try to build a solution that depends on multiple threads accessing your selector you very quickly end up in one of two situations:

    Plagued by deadlocks and race conditions as you build up an increasingly fragile house of cards (or rather house of locks) to avoid stepping on your own toes while accessing the selector and its key sets.
    Effectively single-threading access to the selector and its key sets in an attempt to avoid, well, stepping on your own toes.

The upshot of this is that if you want to build a solution based on NIO then trust me and stick to a single selecting thread. Offload events as you see fit but stick to one thread on the selector.

I tend to handle all I/O within the selecting thread too. This means queuing writes and having the selecting thread perform the actual I/O, and having the selecting thread read off ready channels and offload the read data to a worker thread. In general I've found this scales well enough so I've not yet had a need to have other threads perform the I/O on the side.
Modify the selector from the selecting thread only

If you look closely at the NIO documentation you'll come across the occasional mention of naive implementations blocking where more efficient implementations might not, usually in the context of altering the state of the selector from another thread. If you plan on writing code against the NIO libraries that must run on multiple platforms you have to assume the worst. This isn't just hypothetical either, a little experimentation should be enough to convince you that Sun's Linux implementation is "naive". If you plan on targeting one platform only feel free to ignore this advice but I'd recommend against it. The thing about code is that it oftens ends up in the oddest of places.

As a result, if you plan to hang onto your sanity don't modify the selector from any thread other than the selecting thread. This includes modifying the interest ops set for a selection key, registering new channels with the selector, and cancelling existing channels.

A number of these changes are naturally initiated by threads that aren't the selecting thread. Think about sending data. This pretty much always has to be initiated by a calling thread that isn't the selecting thread. Don't try to modify the selector (or a selection key) from the calling thread. Queue the operation where the selecting thread can get to it and call Selector.wakeup. This will wake the selecting thread up. All it needs to do is check if there are any pending changes, and if there are apply them and go back to selecting on the selector. There are variations on this but that's the general idea.
Set OP_WRITE only when you have data ready

A common mistake is to enable OP_WRITE on a selection key and leave it set. This results in the selecting thread spinning because 99% of the time a socket channel is ready for writing. In fact the only times it's not going to be ready for writing is during connection establishment or if the local OS socket buffer is full. The correct way to do this is to enable OP_WRITE only when you have data ready to be written on that socket channel. And don't forget to do it from within the selecting thread.
Alternate between OP_READ and OP_WRITE

If you try to mix OP_READ and OP_WRITE you'll quickly get yourself into trouble. The Sun Windows implementation has been seen to deadlock if you do this. Other implementations may fare better, but I prefer to play it safe and alternate between OP_READ and OP_WRITE, rather than trying to use them together.

With those out of the way, let's take a look at some actual code. The code presented here could do with a little cleaning up. I've simplified a few things in an attempt to stick to the core issue, using the NIO libraries, without getting bogged down in too many abstractions or design discussions.
The server
A starting point

We need a little infrastructure before we can start building a NIO server. First, we'll need a thread. This will be our selecting thread and most of our logic will live in the class that is home to it. And I'm going to have the selecting thread perform reads itself, so we'll need a ByteBuffer to read into. I'm going to use a non-direct buffer rather than a direct buffer. The performance difference is minor in this case and the code is slightly clearer. We're also going to need a selector and a server socket channel on which to accept connections. Throwing in a few other minor odds and ends we end up with something like this.
public class NioServer implements Runnable {
  // The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;
  
  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }

  public static void main(String[] args) {
    try {
      new Thread(new NioServer(null, 9090)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}


Creating the selector and server channel

The astute will have noticed the call to initSelector() in the constructor. Needless to say this method doesn't exist yet. So let's write it. It's job is to create and initialize a non-blocking server channel and a selector. It must and then register the server channel with that selector.
private Selector initSelector() throws IOException {
    // Create a new selector
    Selector socketSelector = SelectorProvider.provider().openSelector();

    // Create a new non-blocking server socket channel
    this.serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
    serverChannel.socket().bind(isa);

    // Register the server socket channel, indicating an interest in 
    // accepting new connections
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

    return socketSelector;
  }

 
Accepting connections

Right. At this point we have a server socket channel ready and waiting and we've indicated that we'd like to know when a new connection is available to be accepted. Now we need to actually accept it. Which brings us to our "select loop". This is where most of the action kicks off. In a nutshell our selecting thread sits in a loop waiting until one of the channels registered with the selector is in a state that matches the interest operations we've registered for it. In this case the operation we're waiting for on the server socket channel is an accept (indicated by OP_ACCEPT). So let's take a look at the first iteration (I couldn't resist) of our run() method.
public void run() {
    while (true) {
      try {
        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKey) selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

 

This only takes us half way though. We still need to accept connections. Which means, you guessed it, an accept() method.
private void accept(SelectionKey key) throws IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

 
Reading data

Once we've accepted a connection it's only of any use if we can read (or write) data on it. If you look back at the accept() method you'll notice that the newly accepted socket channel was registered with our selector with OP_READ specified at the operation we're interested in. That means our selecting thread will be released from the call to select() when data becomes available on the socket channel. But what do we do with it? Well, the first thing we need is a small change to our run() method. We need to check if a socket channel is readable (and deal with it if it is).
// Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          } else if (key.isReadable()) {
            this.read(key);
          }


         
Which means we need a read() method...
private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();
    
    // Attempt to read off the channel
    int numRead;
    try {
      numRead = socketChannel.read(this.readBuffer);
    } catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }

    if (numRead == -1) {
      // Remote entity shut the socket down cleanly. Do the
      // same from our end and cancel the channel.
      key.channel().close();
      key.cancel();
      return;
    }

    // Hand the data off to our worker thread
    this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead);
  }


Hang on, where did worker come from? This is the worker thread we hand data off to once we've read it. For our purposes all we'll do is echo that data back. We'll take a look at the code for the worker shortly. However, assuming the existence of an EchoWorker class, our infrastructure needs a little updating. We need an extra instance member, a change to our constructor and one or two extras in our main() method.
 private EchoWorker worker;

  public NioServer(InetAddress hostAddress, int port, EchoWorker worker) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
    this.worker = worker;
  }

  public static void main(String[] args) {
    try {
      EchoWorker worker = new EchoWorker();
      new Thread(worker).start();
      new Thread(new NioServer(null, 9090, worker)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }



Let's take a closer look at EchoWorker. Our echo worker implementation is implemented as a second thread. For the purposes of this example we're only going to have a single worker thread. This means we can hand events directly to it. We'll do this by calling a method on the instance. This method will stash the event in a local queue and notify the worker thread that there's work available. Needless to say, the worker thread itself will sit in a loop waiting for events to arrive and when they do it will process them. Events in this case are just data packets. All we're going to do is echo those packets back to the sender.

Normally, instead of calling a method on the worker directly, you'll want to use a form of blocking queue that you can share with as many workers as you like. Java 1.5's concurrency package introduces a BlockingQueue interface and some implementations, but it's not that hard to roll your own. The logic in the EchoWorker code below is a good start.
public class EchoWorker implements Runnable {
  private List queue = new LinkedList();
  
  public void processData(NioServer server, SocketChannel socket, byte[] data, int count) {
    byte[] dataCopy = new byte[count];
    System.arraycopy(data, 0, dataCopy, 0, count);
    synchronized(queue) {
      queue.add(new ServerDataEvent(server, socket, dataCopy));
      queue.notify();
    }
  }
  
  public void run() {
    ServerDataEvent dataEvent;
    
    while(true) {
      // Wait for data to become available
      synchronized(queue) {
        while(queue.isEmpty()) {
          try {
            queue.wait();
          } catch (InterruptedException e) {
          }
        }
        dataEvent = (ServerDataEvent) queue.remove(0);
      }
      
      // Return to sender
      dataEvent.server.send(dataEvent.socket, dataEvent.data);
    }
  }
}



If you look at the last line of the run() method you'll see we're calling a new method on the server. Let's take a look at what we need to do to send data back to a client.
Writing data

Writing data to a socket channel is pretty straightforward. However, before we can do that we need to know that the channel is ready for more data. Which means setting the OP_WRITE interest op flag on that channel's selection key. Since the thread that wants to do this is not the selecting thread we need to queue this request somewhere and wake the selecting thread up. Putting all of this together we need a write() method and a few additional members in our server class.
 // A list of ChangeRequest instances
  private List ChangeRequests = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();

  public void send(SocketChannel socket, byte[] data) {
    synchronized (this.ChangeRequests) {
      // Indicate we want the interest ops set changed
      this.ChangeRequests.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
      
      // And queue the data we want written
      synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
          queue = new ArrayList();
          this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
      }
    }
    
    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }



We've introduced another class: ChangeRequest. There's no magic here, it just gives us an easy way to indicate a change that we want made on a particular socket channel. I've jumped ahead a little with this class, in anticipation of some of the other changes we'll ultimately want to queue.

public class ChangeRequest {
  public static final int REGISTER = 1;
  public static final int CHANGEOPS = 2;
  
  public SocketChannel socket;
  public int type;
  public int ops;
  
  public ChangeRequest(SocketChannel socket, int type, int ops) {
    this.socket = socket;
    this.type = type;
    this.ops = ops;
  }
}



Waking the selecting thread up is of no use until we modify our selecting thread's logic to do what needs to be done. So let's update our run() method.
public void run() {
    while (true) {
      try {
        // Process any pending changes
        synchronized(this.ChangeRequests) {
          Iterator changes = this.ChangeRequests.iterator();
          while (changes.hasNext()) {
            ChangeRequest change = (ChangeRequest) changes.next();
            switch(change.type) {
            case ChangeRequest.CHANGEOPS:
              SelectionKey key = change.socket.keyFor(this.selector);
              key.interestOps(change.ops);
            }
          }
          this.ChangeRequests.clear();
        }
        
        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKey) selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          } else if (key.isReadable()) {
            this.read(key);
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

 
Waking up the selector will usually result in an empty selection key set. So our logic above will skip right over the key set loop and jump into processing pending changes. In truth the order of events here probably doesn't matter all that much. This is just how I prefer to do it. If you take a look at the logic we've added for handling those pending events there are a few things to point out. One is that it's pretty simple. All we do is update the interest ops for the selection key on a given socket channel. The other is that at the end of the loop we clear out the list of pending changes. This is easy to miss, in which case the selecting thread will continually reapply "old" changes.

We're almost done on the writing front, we just need one more piece, the actual write. This means another change to our run() method to check for a selection key in a writeable state.
 // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          } else if (key.isReadable()) {
            this.read(key);
          } else if (key.isWritable()) {
            this.write(key);
          }


        
And of course, we'll need a write() method. This method just has to pull data off the appropriate queue and keep writing it until it either runs out or can't write anymore. I add ByteBuffers to the per-socket internal queue because we'll need a ByteBuffer for the socket write anyway and ByteBuffer's conveniently track how much data remains in the buffer. This last bit is important because a given write operation may end up only writing a part of the buffer (at which point we can stop writing to that socket because further writes will achieve nothing: the socket's local buffer is full). Using a ByteBuffer saves us having to either resize byte arrays in the queue or track an index into the byte array at the head of the queue.

But I've digressed so let's take a look at write().
private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
      List queue = (List) this.pendingData.get(socketChannel);
      
      // Write until there's not more data ...
      while (!queue.isEmpty()) {
        ByteBuffer buf = (ByteBuffer) queue.get(0);
        socketChannel.write(buf);
        if (buf.remaining() > 0) {
          // ... or the socket's buffer fills up
          break;
        }
        queue.remove(0);
      }
      
      if (queue.isEmpty()) {
        // We wrote away all data, so we're no longer interested
        // in writing on this socket. Switch back to waiting for
        // data.
        key.interestOps(SelectionKey.OP_READ);
      }
    }
  }


The only point I want to make about the above code is this. You'll notice that it writes as much data as it can, stopping only when there's no more to write or the socket can't take any more. This is one approach. Another is to write only the first piece of data queued and then continue. The first approach is likely to result in better per-client service while the second is probably more fair in the face of multiple active connections. I suggest you play around a bit before choosing an approach.

We now have the bulk of a working NIO server implementation. In fact, the server you've written so far (if you've been following along diligently) will accept connections, read data off those connections, and echo back the read data to the client that sent it. It will also handle clients disconnecting by deregistering the socket channel from the selector. There's not a whole lot more to a server.

to be continued...

qutoed from: http://www.javafaq.nu/java-article1102.html
分享到:
评论

相关推荐

    NioServer.java

    NioServer.java

    NIOServer

    【标题】:“NIOServer”是一个关于网络编程的Java实现,它使用了Java的非阻塞I/O(Non-blocking I/O,NIO)技术。在Java中,NIO是一种替代传统阻塞I/O(BIO)模型的方式,提供了更高效的数据传输能力。此项目可能是...

    nioserver.zip

    同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 ->关闭连接 ]这个 过程中,触发器将触发相应事件,由事件处理器对相应事件分别响应,完成服务器端的业务处理。...

    nioserver.zip_NIO_event driven java_java nio_java nioserv_nioser

    标题中的“nioserver.zip_NIO_event driven java_java nio_java nioserv_nioser”表明这是一个关于Java NIO的服务器实现,且是基于事件驱动模式的。事件驱动编程是一种设计模式,它允许程序对特定事件做出响应,而...

    nioserver.rar_NIO_java nio

    NIO(Non-blocking Input/Output)是Java编程语言中的一种I...通过熟练掌握NIO,开发者可以设计出更加高效、灵活的网络服务程序。在实际开发中,NIO常被应用于大型分布式系统、高并发的Web服务器以及游戏服务器等领域。

    java nio im(server+client)

    Java NIO,全称为Non-Blocking Input/Output(非阻塞输入/输出),是Java从1.4版本开始引入的一种新的I/O模型,相对于传统的BIO( Blocking I/O)模型,NIO在处理高并发、大数据量的网络应用时表现出更高的效率和...

    android nio程序

    本篇将深入探讨`android nio`程序,主要关注`NioServer`和`NioClient`两个示例程序。 ### 一、`java.nio`概述 `java.nio`是Java平台提供的一种I/O模型,与传统的`java.io`流模型相比,具有以下优势: 1. **缓冲区...

    Ioserver java Nio socket 框架

    Ioserver java Nio socket 框架 是个不错的NIO 通讯框架,本来想学习mina框架,看了看mina的源码太头痛,本人觉得看懂了Ioserver 再看mina的框架,想多的学习 java NIO 的也可以下载 看看,很值得学习啊!!!

    NioSocket,包括server端和client端

    NioSocket是一个基于Java NIO(非阻塞I/O)技术实现的网络通信框架,它包含服务器端(Server)和客户端(Client)两部分。在Java编程中,NIO(New Input/Output)提供了一种不同于传统IO模型的I/O操作方式,其核心...

    JAVA-NIO程序设计完整实例

    **JAVA-NIO程序设计完整实例** Java NIO(New IO)是Java 1.4引入的一个新特性,它为Java提供了非阻塞I/O操作的能力,使得Java在处理I/O时更加高效。NIO与传统的BIO(Blocking I/O)模型相比,其核心在于它允许程序...

    java_Nio_server_and_j2me_client.rar_J2ME SERVER_NIO_j2me_j2me ni

    而"TestNIO_client"和"TestNIO_server"这两个文件名很可能分别对应的是NIO客户端和服务器的源代码文件,用户可以通过这些源代码了解如何在J2ME和Java NIO之间建立有效的通信。 在实际应用中,J2ME客户端可能使用...

    java NIO实例

    `NIOServer.java`和`NIOClient.java`这两个文件很可能是用于演示Java NIO服务器端和客户端的基本操作。下面将详细介绍Java NIO的主要组件和工作原理,并结合这两个文件名推测它们可能包含的内容。 1. **Selector...

    nioServer-开源

    《深入解析:Linux环境下C++实现的多线程服务器框架——nioServer》 在IT行业中,服务器开发是一项至关重要的任务,特别是在高性能、高并发的场景下。本文将深入探讨一个名为"nioServer"的开源项目,它是一个在...

    java NIO 学习 聊天室程序 (3)

    在这个“Java NIO 学习 聊天室程序”项目中,我们将深入探讨NIO如何用于创建一个聊天室程序。这个程序可能包含以下几个关键部分: 1. **服务器端**:服务器端使用NIO的ServerSocketChannel监听客户端连接。当新的...

    mina框架--MINA框架是对java的NIO包的一个封装

    MINA(Multipurpose Infrastructure for Network Applications)框架是Apache软件基金会的一个开源项目,它提供了一种高级的、事件驱动的网络应用程序框架...

    JAVA NIO 聊天室程序

    **JAVA NIO 聊天室程序** 在Java编程领域,网络编程是一个重要的部分,而NIO(Non-blocking Input/Output)是Java提供的一种高效、非阻塞的I/O模型,它极大地提升了处理大量并发连接的能力。这个"JAVA NIO 聊天室...

    java nio server-开源

    小型简单但完整的Java NIO服务器,任何人都可以免费使用。 目前,它仅处理发送和接收字符串,并且尚未进行优化-但它易于理解并适应您的需求。

    java nio socket 例子

    本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成

    基于事件的 NIO 多线程服务器

    基于事件的 NIO 多线程服务器

Global site tag (gtag.js) - Google Analytics