`
t8500071
  • 浏览: 112904 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

nio中客户端发送一次数据,服务端通过多次readable事件才能完整读取

    博客分类:
  • java
阅读更多
最近在开发一个c/s项目,主要功能是:客户端负责采集数据发送给服务端入库。

客户端与服务端交互大致过程如下:
客户端发送一次数据,服务端read一次数据并解析入库。

先描述下问题,后面会贴出较为详细的设计和代码:
调试环境:
客户端与服务端在同一台pc机(排除丢包的可能性)

问题:
该项目在调试时,会偶然出现一个问题
如:
客户端一次性发送9793B的数据,服务端nio在reactor触发的一次readable事件里读取channel中的流数据,当该次readable事件的流数据读取完毕后,发现总数据量才4380B,小于客户端所发送的9793B,剩下的数据在紧接下来的一次readable事件中被读取出来。
因此,由于客户端所发送的一个数据包,服务端分为2次或多次接收,导致该数据包后续的处理均出现异常。

由于最初设计时,完全没考虑到这样的情况,也没有发现谁碰过类似的问题,导致现在改动代码会比较麻烦,还望各位英雄豪杰帮忙分析分析。

该项目的大体设计和工作过程如下:
客户端:
客户端并不使用nio,只是使用普通socket连接服务端(因为我没发现是否有区别,难道问题原因就在此?);
客户端将采集到的数据,定时发送给服务端,发送的数据量大小是不固定的;
发送部分代码如下:
    public void send2server(int command, byte[] msg) throws IOException
    {
        if (out != null)
        {
            msg = MessageFilter.addHeader(command, msg);
            _log.info("发送字节数:" + msg.length);
            //msg为byte[]类型
            out.write(msg);
            out.flush();
        }
        else
        {
            throw new IOException("输出流为空");
        }
    }
 


服务端:
服务端通过nio触发readable事件来一次性读取channel中的数据、解析数据并入库。
服务端nio部分代码如下:
                        try
                        {
                            // 处理IO事件
                                 if (key.isAcceptable())
                                accept(key);
                            else if (key.isReadable())
                            {
                                _log.debug("发现读IO");

                                //与此问题相关的关键代码为这句
                                Reader2.processKey(key); // 提交读服务线程读取客户端数据
                                SocketChannel sc = (SocketChannel) key.channel();
                                //如果为长连接
                                if (sc.socket().getKeepAlive())
                                {
                                    key.interestOps(key.interestOps()
                                            & ~SelectionKey.OP_READ);
                                    _log.debug("移除兴趣读");
                                }
                                else
                                    key.cancel();
                            }
                            else if (key.isWritable())
                            {
                                _log.debug("发现写IO");
                                SocketChannel sc = (SocketChannel) key.channel();
                                Writer.processRequest(key); // 提交写服务线程向客户端发送回应数据
                                     //如果为长连接
                                if (sc.socket().getKeepAlive())
                                    key.interestOps(SelectionKey.OP_READ);
                                else
                                    key.cancel();
                            }
                        }
                        catch (Exception e)
                        {
                            key.cancel();
                            _log.info("处理key出错,连接可能意外中断了");
                            e.printStackTrace();
                        }


//与此问题相关的关键代码为这句
Reader2.processKey(key); // 提交读服务线程读取客户端数据

Reader2类代码为:
package com.gdtec.nmt.nioserver.io;

import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

import com.gdtec.nmt.filter.MessageFilter;
import com.gdtec.nmt.nioserver.InterestInfo;
import com.gdtec.nmt.nioserver.Notifier;
import com.gdtec.nmt.nioserver.Request;
import com.gdtec.nmt.nioserver.Server;
import com.gdtec.nmt.pojo.ClientInfo;
import com.gdtec.nmt.pojo.MessagePackage;


/**
 * <p>Title: 读线程</p>
 * <p>Description: 该线程用于读取客户端数据</p>
 * @author zhuhongzheng
 * @version 1.0
 */

public class Reader2 extends Thread {
    
    private static Logger _log = Logger.getLogger(Reader2.class);
            
    private static List<Request> requestsPool = new LinkedList<Request>();
    private static Notifier notifier = Notifier.getNotifier();

    public Reader2() {
    }

    public void run() {
        while (true) {
            try {
                Request request;
                synchronized (requestsPool) {
                    while (requestsPool.isEmpty()) {
                        requestsPool.wait();
                    }
                    request = requestsPool.remove(0);
                }

                // 读取数据
                processRequest(request);
            }
            catch (Exception e) {
                _log.info("读取池出现异常!", e);
            }
        }
    }

    private static int BUFFER_SIZE = 20480;
    
    /**
     * 读取客户端发出请求数据
     * @param sc 套接通道
     * @return
     * @throws IOException
     */
    private static byte[] readInput(SocketChannel sc) throws IOException
    {
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        int off = 0;
        int r = 0;
        byte[] data = new byte[BUFFER_SIZE];

        //与此问题相关的关键代码,在稍后我贴的日志可以看出,在这里只read了4380B的数据,这个while循环,只进行了一次
         //何解?
        while ((r = sc.read(buffer)) > 0)
        {
            _log.debug("发现数据:" + r);
            if ((off + r) > data.length)
            {
                data = grow(data, BUFFER_SIZE * 2);
                // System.out.println("容量扩展为:" + data.length);
                _log.debug("容量扩展为:" + data.length);
            }
            byte[] buf = buffer.array();
            System.arraycopy(buf, 0, data, off, r);
            off += r;
            buffer.clear();
            _log.debug("共读了:" + off);
        }
        String memoryMsg = ", freeMemory="
                + (Runtime.getRuntime().freeMemory() / (1024)
                        + "k,totalMemory="
                        + (Runtime.getRuntime().totalMemory() / (1024)) + "k");
        _log.info("共读了:" + off + memoryMsg);
        byte[] req = new byte[off];
        System.arraycopy(data, 0, req, 0, off);
        return req;
    }

    /**
     * 处理连接数据读取
     * @param request Request
     */
    public static void processRequest(Request request) {
        try {
            // 读取客户端数据
            byte[] received = request.getDataInputByte();
            _log.debug("接收到字节数:" + received.length);
            //System.out.println("reader 收到数据包:" + new String(received, "GBK"));
            MessagePackage msgPackage = MessageFilter.decodeMessage(received);
            // String clientData = readInput(sc);
            
            String clientData = null;
            
            //数据不完整或者数据为空,则不做处理
            if (msgPackage == null || null == msgPackage.getBody())
            {
                throw new Exception("数据包格式有问题,请检查");
            }
            
            clientData = msgPackage.getBody();
            request.setDataInput(clientData);
            request.setDataInputByte(new byte[0]);

            request.setParameter("command", msgPackage.getCommand());
            request.setParameter("bodyLength", msgPackage.getBodyLength());
            
            // 触发onRead
            notifier.fireOnRead(request);

        }
        catch (Exception e)
        {
            _log.error(e);
        }

        try
        {
            //SocketChannel 在accept事件中被置入request对象
            SocketChannel sc = request.getSc();
            if (sc == null)
                return;
            //如果是长连接模式,read完了就要把write事件注册到selector,否则无法触发该channel的write事件
            if (sc.socket().getKeepAlive())
            {
                Server.registerInterest(new InterestInfo(sc,
                        SelectionKey.OP_WRITE, request));
            }
        }
        catch (SocketException e)
        {
            _log.error("注册写兴趣时出现异常!", e);
        }
    }

    public static void processKey(SelectionKey key)
    {
        SocketChannel sc = (SocketChannel) key.channel();
        Request request = (Request) key.attachment();
        try
        {
            byte[] received = readInput(sc);
            if(received.length==0)
            {
                //  TODO:收到空数据。。怎么回事;
                return ;
            }
            _log.debug("接收到字节数:" + received.length);
            request.setDataInputByte(received);
            
            put2RequestPool(request);
        }
        catch (IOException e1)
        {
            key.cancel();
            _log.error(e1);
            ClientInfo client = (ClientInfo) request.getParameter("ClientInfo");

            Server.clientConnectionError(client);
        }
        catch (Exception e)
        {
              _log.error(e);
        }
    }
    /**
     * 处理客户请求,管理用户的联结池,并唤醒队列中的线程进行处理
     */
    public static void put2RequestPool(Request request) {
        synchronized (requestsPool) {
            requestsPool.add(requestsPool.size(), request);
            requestsPool.notifyAll();
        }
    }

    /**
     * 数组扩容
     * @param src byte[] 源数组数据
     * @param size int 扩容的增加量
     * @return byte[] 扩容后的数组
     */
    public static byte[] grow(byte[] src, int size) {
        byte[] tmp = new byte[src.length + size];
        System.arraycopy(src, 0, tmp, 0, src.length);
        return tmp;
    }
}



        //与此问题相关的关键代码,在稍后我贴的日志可以看出,在这里只read了4380B的数据,这个while循环,只进行了一次
        //何解?
        while ((r = sc.read(buffer)) > 0)


日志内容:
客户端部分日志:
引用

2010-11-16 18:53:10[INFO]ClientMain.java 369■Thread-2■: 发送字节数:9793
2010-11-16 18:53:10[INFO]ClientWriter.java 166■Thread-2■: 缓冲区已满,发送数据,清空缓冲区


服务端日志:
引用

2010-11-16 18:51:16[INFO]Reader2.java 121■main■: 发现数据:9600
2010-11-16 18:51:16[INFO]Reader2.java 121■main■: 发现数据:4441
2010-11-16 18:51:16[INFO]Reader2.java 138■main■: 共读了:14041, freeMemory=2936k,totalMemory=5056k
2010-11-16 18:51:16[INFO]ResultsHandler.java 271■Thread-11■: 保存了111条数据
2010-11-16 18:53:10[INFO]Reader2.java 121■main■: 发现数据:4380
2010-11-16 18:53:10[INFO]Reader2.java 138■main■: 共读了:4380, freeMemory=2090k,totalMemory=5056k
2010-11-16 18:53:10[ERROR]ResultsHandler.java 280■Thread-11■: 保存任务结果出现异常:
java.sql.BatchUpdateException: '127.0' 附近有语法错误。
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeBatch(SQLServerStatement.java:1693)
at com.gdtec.nmt.nioserver.Handler.ResultsHandler.saveTasks(ResultsHandler.java:269)
at com.gdtec.nmt.nioserver.Handler.ResultsHandler.onRead(ResultsHandler.java:105)
at com.gdtec.nmt.nioserver.Notifier.fireOnRead(Notifier.java:71)
at com.gdtec.nmt.nioserver.io.Reader2.processRequest(Reader2.java:175)
at com.gdtec.nmt.nioserver.io.Reader2.run(Reader2.java:51)
2010-11-16 18:53:10[INFO]Reader2.java 121■main■: 发现数据:5413
2010-11-16 18:53:10[INFO]Reader2.java 138■main■: 共读了:5413, freeMemory=1056k,totalMemory=5056k
2010-11-16 18:53:10[ERROR]Reader2.java 180■Thread-11■: java.lang.NumberFormatException: For input string: "0A4E-D6DF-27D8-C444E3A3EA67'"
2010-11-16 18:53:21[INFO]Reader2.java 121■main■: 发现数据:4380
2010-11-16 18:53:21[INFO]Reader2.java 121■main■: 发现数据:4493
2010-11-16 18:53:21[INFO]Reader2.java 138■main■: 共读了:8873, freeMemory=1306k,totalMemory=5056k
2010-11-16 18:53:21[INFO]ResultsHandler.java 271■Thread-11■: 保存了68条数据


几乎每次出错都是因为服务端只读了4380B,从日志中可以看到,客户端发送了9793B,服务端第一次readable只发现了4380B,下一次的readable发现5413B,加起来刚好等于客户端发送的9793B,由此可知,一次readable事件并不一定能将io流读取完,可能会分多次readable事件读取。这又让我感觉越来越不理解nio底层原理了,reactor触发readable事件的条件是什么?
分享到:
评论
4 楼 jjed0119cs04 2014-01-26  
请问一个小问题。
我刚学NIO。

我测试的时候,都是  服务端开着。

客户端发一条数据过去,服务端就返回一条数据

一定需要这样吗?  服务端收到了才返回数据。 一收一发

可以在服务端没收到数据的时候,主动给客户端发东西吗?(TCP是链接上的)
3 楼 t8500071 2013-08-01  
这个是典型的粘包和半包的问题,需要自己定义协议,如加上数据包长度等。。之前看过mina和netty,好像也要自定义协议才行。
2 楼 love297 2013-04-29  
dolo1984 写道
你这个问题是怎么解决的?

同问。
1 楼 dolo1984 2013-02-26  
你这个问题是怎么解决的?

相关推荐

    Java NIO实现多个客户端之间的消息互发,客户端与服务器完整代码

    Java NIO(Non-blocking Input/Output)是一种在Java中处理I/O操作的新方式,相比于传统的BIO(Blocking I/O),NIO提供了更高效的数据传输能力,尤其适合于高并发、低延迟的网络应用,如聊天服务器。在这个场景下,...

    NIO实现客户端之间通信

    在本文中,我们将深入探讨如何使用NIO实现客户端之间的通信,并通过一个中心服务器进行消息的转发。 首先,我们来看通道。通道是NIO中的数据传输路径,它可以连接到硬件设备、文件系统或其他服务。例如,...

    多客户端和服务端通讯

    服务端需要维护一个客户端列表,当新客户端连接时,将新客户端的信息添加到列表中,并将上线消息发送给其他客户端。 群聊功能的实现,需要服务端能够接收来自任何客户端的消息,并将这些消息广播给所有在线的客户端...

    nio socket编程java代码示例,客户端发送消息,服务端接收

    在isReadable()判断中,服务端将读取客户端发送的数据,并做出响应。 二、客户端实现 客户端主要任务是创建SocketChannel,连接到服务器,并向服务器发送数据。以下是一个基本的客户端代码框架: ```java import ...

    一个NIO服务端,客户端的例子

    Netty是一个基于NIO的高性能、异步事件驱动的网络应用程序框架,它极大地简化了网络编程,包括TCP和UDP协议的服务器和客户端应用开发。 在Java NIO中,核心组件包括通道(Channel)、缓冲区(Buffer)和选择器...

    nio异步长连接服务端与客户端

    Java NIO(非阻塞I/O)是一种在Java中实现高效I/O操作的方式,相比于传统的BIO(阻塞I/O),NIO提供了更强大的数据传输能力,尤其适用于高并发、低延迟的网络应用,如服务器长连接场景。在这个主题中,我们将深入...

    Java NIO非阻塞服务端与客户端相互通信

    - **缓冲区(Buffers)**:NIO中的数据操作都在缓冲区上进行,这是NIO的主要特性之一,缓冲区提供了更高效的数据存取方式。 - **选择器(Selectors)**:选择器允许单个线程监视多个通道,当通道准备好进行读写...

    NIO 服务器客户端例子

    Java NIO(New IO)是Java 5及更高版本中引入的一个重要特性,它提供了与标准的Java IO API不同的I/O工作方式。NIO代表非阻塞I/O,它的核心特点是能够实现多路复用,使得一个单线程可以同时处理多个连接,极大地提高...

    JAVA NIO客户端服务端完整项目工程(ServerandClientNIOV1.0.zip)包下载.txt

    该JAVA NIO项目包含server服务端完整项目源码、client客户端项目工程源码。

    JAVA nio异步长连接服务端与客户端

    TCP长连接是指在客户端和服务端之间保持一个持久的连接,可以多次收发数据,而不必每次通信都建立新的连接。这对于高频率的交互和实时性要求较高的应用非常有用,如在线聊天、股票交易等。 在服务端,通常会创建一...

    JAVA NIO 异步通信客户端

    7. **Event Loop**: 在JAVA NIO客户端中,通常有一个事件循环(Event Loop),它不断地调用Selector的`select()`方法,处理就绪的通道。 8. **线程模型**: 由于NIO的异步特性,客户端可能使用单线程或多线程模型。...

    java应用netty服务端和客户端

    Java应用程序中的Netty框架是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty广泛应用于分布式系统、云计算、大数据处理等领域,它的核心特性包括非阻塞I/O、...

    Java中实现服务端和客户端传输文件

    - 通过`Socket`对象的`getInputStream()`获取输入流,用于读取客户端发送的数据。 - 使用`FileOutputStream`将接收到的数据写入到本地文件,实现文件接收。 2. **客户端实现**: - 创建`Socket`实例,指定服务器...

    java nio 包读取超大数据文件

    - **分块读取**:对于超大数据文件,可以考虑按照文件大小进行分块读取,避免一次性加载过多数据到内存中。 - **异步处理**:如果系统允许,可以考虑使用NIO的异步特性来进一步提高性能。 - **多线程处理**:结合...

    Netty实现简单的客户端服务端通信示例

    2. 配置 ChannelPipeline:每个连接都有一个 ChannelPipeline,它可以包含多个处理器(ChannelHandler)来处理进来的数据或发送出去的数据。在服务端,通常需要添加解码器(如 ByteToMessageDecoder)来解析接收到的...

    NIO按行读取数据

    在Java NIO(New Input/Output)框架中,虽然提供了高效且灵活的I/O操作方式,但默认并不支持按行读取文件数据。正如描述中提到的,开发者通常需要自定义方法来实现这一功能。上述代码片段展示了如何使用NIO以行作为...

    JAVA NIO 异步通信模板服务端

    在"JAVA NIO 异步通信模板服务端"的实现中,首先需要创建一个ServerSocketChannel,并通过Selector注册它。接着,服务器会进入一个无限循环,调用`selector.select()`方法来等待事件发生。当有事件发生时,选择器会...

    MINA 服务端和客户端demo

    - 事件模型基于Java NIO(Non-blocking Input/Output),通过Selector监控多个通道(Channels)的事件,如连接建立、数据到达、连接关闭等。 4. **MINA的缓冲区管理**: - MINA使用Buffer对象来存储和操作网络...

    JAVA聊天 服务端 客户端

    总结来说,"JAVA聊天服务端客户端"是一个涵盖Java网络编程、多线程、用户界面设计、数据传输安全等多个领域的项目。服务端和客户端通过Socket通信,处理并发连接,并确保数据的安全传输。了解这些知识点对于开发和...

Global site tag (gtag.js) - Google Analytics