请教一技术问题,在Reactor模式和NIO中,当客户端建立一个socket连接,然后以毫秒级的时间间隔向server端发数据,如 for(,,){send(data)} .发现Server端只接收到部分数据,大部分数据丢失。主要代码如下:
java 代码
- Server.java
- public class Server implements Runnable {
- public Server(int port) throws IOException {
-
- selector = Selector.open();
- serverSocket = ServerSocketChannel.open();
- InetSocketAddress address = new InetSocketAddress(InetAddress
- .getLocalHost(), port);
- serverSocket.socket().bind(address);
-
- serverSocket.configureBlocking(false);
-
- SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
- logger.info("-->Start LogServer!");
-
- sk.attach(new Acceptor(selector, serverSocket));
- }
-
- public void run() {
-
- try {
- while (!this.stop) {
- selector.select();
- Set selected = selector.selectedKeys();
- Iterator it = selected.iterator();
-
- while (it.hasNext())
-
-
- dispatch((SelectionKey) (it.next()));
- selected.clear();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
-
-
- private void dispatch(SelectionKey k) {
- logger.debug("-->in LogServer.dispatch()!");
- printKeyInfo(k);
- Runnable r = (Runnable) (k.attachment());
- if (r != null) {
- System.out.println(k);
- r.run();
- }
- logger.debug("-->out LogServer.dispatch()!");
- }
- }
java 代码
- Acceptor.java
- public class Acceptor implements Runnable {
- public Acceptor(Selector selector, ServerSocketChannel ssc) {
- logger.debug("-->New Acceptor()!");
- this.selector = selector;
- this.ssc = ssc;
- }
-
- public void run() {
- try {
- logger.debug("-->In Acceptor.run()!");
- SocketChannel sc = ssc.accept();
- logger.info("-->Accepted connection from " + sc);
- if (sc != null) {
-
-
-
-
-
-
- new SocketDataHandler(selector, sc);
- }
- } catch (ClosedChannelException e) { e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
}
java 代码
- public class SocketDataHandler implements Runnable {
-
- private final static QueueFactory queueFactory = QueueFactory.getInstance();
-
- public SocketDataHandler(Selector sel, SocketChannel c) throws IOException {
- logger.debug("-->New SocketDataHandler()");
- socket = c;
- socket.configureBlocking(false);
- sk = socket.register(sel, 0);
-
-
- sk.attach(this);
-
- sk.interestOps(SelectionKey.OP_READ);
- sel.wakeup();
- }
-
-
-
-
-
-
- public void run() {
- logger.debug("-->In SocketDataHandler.run()!");
- try {
- readRequest();
- logger.debug("-->out SocketDataHandler.run()!");
- } catch (IOException e) {
- sk.cancel();
- if (socket.isOpen()) {
- try {
- socket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- e.printStackTrace();
- }
-
- }
-
-
-
-
-
- private void readRequest() throws IOException {
- logger.debug("-->In SocketDataHandler.readRequest()!");
- ByteBuffer input = ByteBuffer.allocate(4500);
- input.clear();
- int length = socket.read(input);
- if (length == -1) {
- sk.cancel();
- if (socket.isOpen()) {
- try {
- socket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- return;
- }
- input.flip();
- LogProcess sp = new LogProcess(socket, input);
- WorkQueue wq = queueFactory.getQueue();
- wq.execute(sp);
- logger.debug("-->Out SocketDataHandler.readRequest()!");
- }
-
- }
java 代码
- public class Client() {
-
- public static void main(String[] args) {
- Socket sc = SocketConnection.getSocket(remoteAddress);
- String data = "test";
- for(int i=0; i<=10; i++){
- send(data + i);
- }
-
- try {
- sc.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }