对NIO进行封装,使其方便使用。
UML类图:
DatagramChannel
打开一个指定端口或随机端口,并可以接收任何发到本端口的数据报,也可以发送数据报到任何一个远程的地址。一个 DatagramChannel 即可以做为Server端(前提是Client端知道你的端口)也可以做为Client端,即 N对N的关系。
ServerSocketChannel
Socket通信中的服务端,打开一个指定的端口,可以接收多个Socket连接,如A主机发送Socket连接请求,连接成功后,生成SocketChannel ,该通道只能接收或发送数据到A主机。 N个主机发送N个Socket请求,连接成功后,服务端能够与N个主机通信。即1对N的关系。
SocketChannel
Socket通信中的客户端,打开一个指定端口或随机端口,连接到服务端后,就可以与服务端通信了。一条SocketChannel通道与服务端是 1对1的关系。
分别对上面三个通道进行封状,方便操作。
数据的接收:
使用NIO的selector监控通道的可读取事件(需要有一条线程来执行,暂称为监控线程),当有数据发来,获取发生事件的通道,并调用通道的读取方法获取数据,对于获取的数据如何处理呢? 定义了一个数据接收器NetReceiver,方法onReceive(ReceiveData receiveData) 用于接收数据,具体的数据处理逻辑由该子类实现。
那么如何调用 NetReceiver. onReceive方法??如果在当前监控线程中调用NetReceiver. onReceive,业务逻辑执行时间长,会影响该通道或其它通道上数据的接收延迟。因此需要单独的线程 来执行。那么引入线程池,在监控线程中调用线程池 execute方法:
execute(new Runnable(){
NetReceiver. onReceive(接收到的数据)
}),这样线程池就可以并行处理接收到的数据提高处理速度。
存在一个问题:
先来说接收到的数据:对于DatagramChannel而言,receive方法接收会接收到一个完整数据报或返回,null表示当前没有可用的数据。接收到的数据以数据报为单位,理论最大字节数可以是65535(含包头)
对于SocketChannel而言,read方法可能读取0个或N个字节流。
对于 DatagramChannel ,如果接收到的每一个数据报,用一条线程来执行,如果前后数据报间没有顺关系是可以,但如果有顺序关系,如一个视频,一起接收到10个数据报并扔入线程池运行,可能第一个数据报最后执行完,因为线程的调度顺序与执行时间不能确定。这就导至了顺序的不一至。
那么可否10条线程进行同步,按顺序号来获取同步锁执行呢? 这导至了9条线程等待,浪费线程。
最好是只有一线程执行。
对于 SocketChannel 而言,每一次接收到的数据字节数不确定, 如果整个字节流中含多个业务数据单位,那么就需要哪些部分是一个业务数据单位,就需要每次接收的数据累加,直至形成一个完整的业务数据单位,再交给一条线程池执行,但需要多个业务数据单位间没有依赖关系。
因此生成三种线程模式:
一个对端址的发来的数据用一条线程处理,顺序流
一个数据报用一条线程来处理,多个数据报多个线程,第一个数据报都是独立了,与其它数据报无依赖
一个业务数据单位一条线程来处理,需要业务层提供判断如果判断接收到的数据够一个数据单元了。
主要接口:
/** * * NIO网络通道的封装 * @author yanlei * */ public interface NetChannelWrap extends Runnable{ /** * 1.打开所有需要使用的资源,如通道或选择器或绑定端口等等。 * 2.监控通道的接收数据 * 3.当接收到数据时,调用NetReceiver.onReceiver方法 * @throws Exception */ public void open() throws IOException; /** * 关闭网络通道,释放所有资源 */ public void close() ; }
/** * 数据报的网络通道封装 * * 1.当数据报通道异常时 * a.选择器上注销数据报通道,即不接收通道上的数据 * b.获取所有未处理完成的数据 * c.调用NetReceiverForSocket.onChannelErrorClose(e, receiveDataQueue);通知业务层所有网络通道不可用 * d.关闭所有通道 * * 2.当主动关闭数据报通道 * * a.选择器上注销数据报通道,即不接收通道上的数据 * b.等待所有读取数据处理完成 * c.调用NetReceiver.onChannelClose()通知业务层,所有通道关闭 * d.关闭所有相关资源 * * @author yanlei * */ public interface DatagramChannelWrap extends NetChannelWrap{ public boolean send(SocketAddress target , ByteBuffer ...buffs ) throws IOException; }
/** * * serverSocket通道的封装,用于数据通信中的服务端 * * * 有几种情况需要注意: * * 1.某条socket通道被服务端关闭(达到流的末尾) * a.不在选择器上监控读取事件,即不接收读取 * b.获取该通道上所有接收的但未处理完成的数据 * c.通知业务层,该通该异常关闭了,即调用NetReceiverForSocket.onSocketChannelCloseByRemote(SocketAddress address,Queue<ReceiveData> receiveDataQueue); * 并传未处理未处理完的数据 * d. 关闭通道 * * 2.某条socket通道发生异常 * a.不在选择器上监控读取事件,即不接收读取 * b.获取该通道上所有接收的但未处理完成的数据 * c.通知业务层,该通该异常关闭了,即调用NetReceiverForSocket.onSocketChannelErrorClose (IOException e,SocketAddress address,Queue<ReceiveData> receiveDataQueue); * 并传未处理未处理完的数据 * d. 关闭通道 * * 3.客户端主动关闭某条socket通道 * a.在选择器上注销该通道的监控读取事件,即不接收读取 * b.等待该通道上所有的已接收数据处理完成 * c.通知业务层,该通该关闭了,即调用NetReceiverForSocket.onSocketChannelClose(SocketAddress address) * d.关闭通道 * * 4.客户 端主动关闭所有通道 * a.选择器上注销所有通道,即不接收任务通道上的数据 * b.等待每一条通道上读取数据处理完成 * c.调用NetReceiverForSocket.onChannelClose()通知业务层,所有通道关闭 * d.关闭所有相关资源 * * 5.活动通道数等于0 , 每一条通道或异常,或主动关闭或被动关闭,导致所有通道都关闭 * a.调用NetReceiverForSocket.haveNoActiveChannel()通知业务层,没有活动通道 * * 6.当选择器异常时 * a.选择器上注销所有通道,即不接收任务通道上的数据 * b.获取所有通道上的未处理完成的数据 * c.调用NetReceiverForSocket.onChannelErrorClose(e, receiveDataQueue);通知业务层所有网络通道不可用 * d.关闭所有通道 * * * @author yanlei * */ public interface ServerSocketChannelWrap extends NetChannelWrap{ public boolean send(SocketAddress target , ByteBuffer ...buffs ) throws IOException; public void closeSocketChannel(SocketAddress target); }
/** * * socket通道的封装,用于客户端连接到服务端通信 * * 支持一条或几条socket通道连接到一个服务端 * * 有几种情况需要注意: * * 1.某条socket通道被服务端关闭(达到流的末尾) * a.不在选择器上监控读取事件,即不接收读取 * b.获取该通道上所有接收的但未处理完成的数据 * c.通知业务层,该通该异常关闭了,即调用NetReceiverForSocket.onSocketChannelCloseByRemote(SocketAddress address,Queue<ReceiveData> receiveDataQueue); * 并传未处理未处理完的数据 * d. 关闭通道 * * 2.某条socket通道发生异常 * a.不在选择器上监控读取事件,即不接收读取 * b.获取该通道上所有接收的但未处理完成的数据 * c.通知业务层,该通该异常关闭了,即调用NetReceiverForSocket.onSocketChannelErrorClose (IOException e,SocketAddress address,Queue<ReceiveData> receiveDataQueue); * 并传未处理未处理完的数据 * d. 关闭通道 * * 3.客户端主动关闭某条socket通道 * a.在选择器上注销该通道的监控读取事件,即不接收读取 * b.等待该通道上所有的已接收数据处理完成 * c.通知业务层,该通该关闭了,即调用NetReceiverForSocket.onSocketChannelClose(SocketAddress address) * d.关闭通道 * 4.客户 端主动关闭所有通道 * a.选择器上注销所有通道,即不接收任务通道上的数据 * b.等待每一条通道上读取数据处理完成 * c.调用NetReceiverForSocket.onChannelClose()通知业务层,所有通道关闭 * d.关闭所有相关资源 * * 5.活动通道数等于0 , 每一条通道或异常,或主动关闭或被动关闭,导致所有通道都关闭 * a.调用NetReceiverForSocket.haveNoActiveChannel()通知业务层,没有活动通道 * * 6.当选择器异常时 * a.选择器上注销所有通道,即不接收任务通道上的数据 * b.获取所有通道上的未处理完成的数据 * c.调用NetReceiverForSocket.onChannelErrorClose(e, receiveDataQueue);通知业务层所有网络通道不可用 * d.关闭所有通道 * * * @author yanlei * */ public interface SocketChannelWrap extends NetChannelWrap { public boolean send(ByteBuffer ...buffs ) throws IOException; public boolean send(SocketAddress fromLocalAddress , ByteBuffer ...buffs ) throws IOException; public void closeSocketChannel(SocketAddress target); }
/** * 网络数据接收器 * 1.用于接收网络发来的数据 * 2.网络通道或异常时被触发 * @author yanlei * */ public interface NetReceiver { /** * 当接收到数据,触发本方法 * @param ReceiveData 数据对象 */ public void onReceive(ReceiveData receiveData); /** * 当本地端口的网络通道或选择器发生异常时,调用本方法,并自动关闭网络通道 * @param e 异常 * @param receiveDataList 已接收到,但未调用onReceive方法处理的数据 */ public void onChannelErrorClose(IOException e,Queue<ReceiveData> receiveDataQueue); /** * 当本地端口 的网络通道被关闭时被调用 */ public void onChannelClose(); /** * 获取接收数据线程的模式 * 一个对端地址使用一条线程:用于处理长的顺序流 * 一个数据包使用一条线程:数据包之间无关,用于数据报 * 一个个完整的数据段使用一条线程:用于socket累加接收数据,直至数据完整 ,才启用一条线程数据。 * @return */ public NetReceiveTaskMode getTaskMode(); /** * 当通道异常时,已接收到的但未处理的数据是否继续处理 * @return */ public boolean isDealReceiveOnChannelError(); }
/** * socket网络连接的数据接收器 * * 1.用于接收网络发来的数据 * 2.网络通道或异常时被触发 * * * @author yanlei * */ public interface NetReceiverForSocket extends NetReceiver{ /** * 当socket连接成功时触发 * 服务端 :socketAddress指远程地址连接到本服务端 * 客服端:socketAddress指本地端口连接到远程服务端 * @param socketAddress */ public void onSocketConnect(SocketAddress socketAddress); /** * 当某条socket通道被对端关闭时(流达到末尾)触发 * @param address 本端是服务端:address 指远程端口地址 ,本端是客户端:address指本地端口地址 * @param receiveDataQueue 已接收但未处理完成的接收数据列表 */ public void onSocketChannelCloseByRemote(SocketAddress address,Queue<ReceiveData> receiveDataQueue); /** * 当某条socket通道被调用closeSocketChannel方法被关闭时触发 * @param address */ public void onSocketChannelClose(SocketAddress address); /** * 当某条socket通道发生异常时触发 * @param e 异常 * @param address 本端是服务端:address 指远程端口地址 ,本端是客户端:address指本地端口地址 * @param receiveDataQueue 已接收但未处理完成的接收数据列表 */ public void onSocketChannelErrorClose (IOException e,SocketAddress address,Queue<ReceiveData> receiveDataQueue); /** * * addUpBuffer中存的是累加的数据,newReceiveBuffer是新接收到的数据,将newReceiveBuffer的数据 * 累加到addUpBuffer中,直到数据完整,则返回完整的数据,addUpBuffer等于剩余的数据,并等待下一次的累加 * @param addUpBuffer 累加的数据 * @param newReceiveBuffer 新接收到的数据 * @return */ public List<ByteBuffer> addUpIntegrityBuffer (ByteBuffer addUpBuffer , ByteBuffer newReceiveBuffer) ; /** * 当没有活动的socket通道时被触发 */ public void haveNoActiveChannel(); /** * 定义最大的完整数据的字节大小 * @return */ public int getMaxAddUpBufferSize(); }
/** * 由线程池执行的线程任务接口,内部调用NetChannel.onReceive 处理业务逻辑 * 主要解决问题 * 1.dealReceiveData 用多线程执行接收数据处理,提高整体处理速度 * 2.removeAllReceiveData 移除所有接收到但未处理的数据,用于异常时不再处理,将不处理的数据交给NetReceive接口,让业务层决定该数据如何处理 * 3.setFinishRunable 设定当需要处理的数据都处理完成时,调用的接口,一般用于SocketChannel在所有接收数据处理完成后再关闭 * @author yanlei * */ public interface NetReceiveTask { public void dealReceiveData(ReceiveData receiveData); public Queue<ReceiveData> removeAllReceiveData(); public void setFinishRunable(Runnable call); }
/** * * 每一个数据报的对端地址或每一条socket连接,需要使用一条线程来顺序处理接收到的字节数据。 * 解决顺序流问题,如果用多条线程执行,线程执行的先后顺序不通确定, * 可能后进来的数据优先处理完了,可能会影响数据结构。 * @author yanlei * */ public class NetReceiveTaskOneThread implements NetReceiveTask,Runnable{ ExecutorService executorService = null; NetReceiver netReceiver = null; SocketAddress socketAddress = null; Queue<ReceiveData> receiveDataQueue = new ConcurrentLinkedQueue<ReceiveData>(); NetReceiveTaskOneThread(SocketAddress socketAddress,NetReceiver netReceiver,ExecutorService executorService){ this.socketAddress = socketAddress; this.executorService = executorService; this.netReceiver = netReceiver; } volatile boolean stop = true; public synchronized void dealReceiveData(ReceiveData receiveData){ this.receiveDataQueue.add(receiveData); if(stop){ this.executorService.execute(this); stop = false; } } public Queue<ReceiveData> removeAllReceiveData(){ Queue<ReceiveData> tempQueue = receiveDataQueue; receiveDataQueue = new ConcurrentLinkedQueue<ReceiveData>(); return tempQueue; } Runnable finishCall = null; public void setFinishRunable(Runnable call){ this.finishCall = call; } public synchronized void onFinish(){ if(this.receiveDataQueue.isEmpty()){ stop = true; if(this.finishCall !=null){ try{ this.finishCall.run(); }catch(Exception e){ } } }else{ this.executorService.execute(this); stop = false; } } @Override public void run() { // TODO Auto-generated method stub ReceiveData receiveData = receiveDataQueue.poll(); while(receiveData != null){ this.netReceiver.onReceive(receiveData); receiveData = receiveDataQueue.poll(); } onFinish(); } }
/** * * 每一次接收到的数据,都使用一条线程来处理,主用于无顺序的udp数据报,即一个 * 数据包中包含了完整的数据结构,并前后数据报之间没有关联。 * @author yanlei * */ public class NetReceiveTaskMultiThread implements NetReceiveTask,Runnable{ ExecutorService executorService = null; NetReceiver netReceiver = null; SocketAddress socketAddress = null; Queue<ReceiveData> receiveDataQueue = new ConcurrentLinkedQueue<ReceiveData>(); NetReceiveTaskMultiThread(SocketAddress socketAddress,NetReceiver netReceiver,ExecutorService executorService){ this.socketAddress = socketAddress; this.executorService = executorService; this.netReceiver = netReceiver; } AtomicInteger num = new AtomicInteger(0); public void dealReceiveData(ReceiveData receiveData){ this.receiveDataQueue.add(receiveData); num.incrementAndGet(); this.executorService.execute(this); } public Queue<ReceiveData> removeAllReceiveData(){ Queue<ReceiveData> tempQueue = receiveDataQueue; receiveDataQueue = new ConcurrentLinkedQueue<ReceiveData>(); return tempQueue; } Runnable finishCall = null; public void setFinishRunable(Runnable call){ this.finishCall = call; } public void onFinish(){ if(this.finishCall !=null){ try{ this.finishCall.run(); }catch(Exception e){ } } } @Override public void run() { // TODO Auto-generated method stub ReceiveData receiveData = receiveDataQueue.poll(); if(receiveData != null){ this.netReceiver.onReceive(receiveData); } if(num.decrementAndGet()==0){ onFinish(); } } }
/** * * 每一次接收到完整数据,使用一条线程来执行。 * 从网络中接收到的数据,对于业务来讲,当前已接收到的数据可能是不完整的,需要再次接收数据,累加成完整的 * 数据结构,才使用一条线程来执行。 * * 完整数据的判断方式:NetReceiverForSocket.addUpIntegrityBuffer (ByteBuffer addUpBuffer , ByteBuffer newReceiveBuffer) ; * @author yanlei * */ public class NetReceiveTaskForSocketMultiThread extends NetReceiveTaskMultiThread{ ByteBuffer addUpBuffer = null; NetReceiveTaskForSocketMultiThread(SocketAddress socketAddress,NetReceiverForSocket netReceiver,ExecutorService executorService){ super(socketAddress,netReceiver,executorService); this.addUpBuffer = ByteBuffer.allocate(((NetReceiverForSocket)this.netReceiver).getMaxAddUpBufferSize()); } public void dealReceiveData(ReceiveData receiveData){ List<ByteBuffer > integrityBufferList = ((NetReceiverForSocket)this.netReceiver).addUpIntegrityBuffer(this.addUpBuffer, receiveData.getByteBuffer()); if(integrityBufferList != null && integrityBufferList.size()>0){ for(ByteBuffer byteBuffer:integrityBufferList){ super.dealReceiveData(new ReceiveData(receiveData.getAddress(),byteBuffer,receiveData.getNetChannelWrap())); } } } }
/** * 扩展线程池,添加shutdown方法,用于在线程池中所有任务执行完成时,调用Closeable接口, * Closeable接口实现类一般来关闭网络通道。即线程池执行完成后,达到关闭网络通道的目的 * * @author yanlei * */ public interface ReceiveThreadPool extends ExecutorService{ /** * 用于在线程池结束时执行指定的接口 * @param closeable */ public void shutdown(Closeable closeable); }
相关推荐
总的来说,"自己封装的IO核NIO"项目通过封装Java的IO和NIO,提供了更高效、易用的网络通信工具,特别是对于需要频繁与服务器交互的应用,如微信开发,能显著提升开发体验。而其中的HTTP客户端则进一步简化了HTTP请求...
Netty基于NIO,但对NIO进行了优化和封装,提供了更高级别的API,使得编程更加简单。Netty的核心组件包括:Bootstrap(启动引导类)、ServerBootstrap(服务器启动引导类)、Channel(网络连接通道)、Pipeline(处理...
MINA框架通过封装NIO,提供了一种更加面向对象和易于使用的API,使得开发者可以更方便地创建高性能、可扩展的网络应用,如TCP和UDP服务器。 在上述代码中,可以看到如何使用MINA框架初始化一个服务器: 1. `...
Netty基于Java NIO,但对NIO进行了封装和优化,提供了更加简单易用的API。Netty的主要特性包括: 1. **零拷贝(Zero-Copy)**:通过直接内存和FileChannel的transferTo方法,避免了不必要的数据复制,提高了效率。 ...
Netty 是基于 NIO 构建的,但它封装了复杂的 NIO API,提供了一套简单易用、高性能的网络编程接口。Netty 的主要特点包括: 1. **异步事件驱动**:Netty 使用事件驱动模型,通过事件循环(EventLoop)来处理事件,...
Netty框架是NIO的一个优秀实践,它封装了NIO的复杂性,提供了一套易于使用的API,使得开发网络应用变得更加简单。 在"nioTest"这个压缩包文件中,可能包含的是使用Java NIO编写的测试代码。这些代码可能涉及到创建...
在本资源"编写一个简单NIO系统.rar"中,我们主要关注的是如何使用Java的非阻塞I/O(Non-blocking I/O,简称NIO)框架来构建一个高效、可扩展的网络应用程序。Netty是一个高性能、异步事件驱动的网络应用框架,常用于...
这些缓冲API封装了无缓冲流:`BufferedInputStream`和`BufferedOutputStream`。 #### 文件I/O 尽管流模型提供了一种简单的方式来读写数据,并且适用于各种数据源和目的地,包括磁盘文件,但它们并不支持与磁盘文件...
- **定义**:Netty是一个基于NIO的网络通信框架,提供了异步、事件驱动的API,使得开发高性能、高可靠性的网络应用变得简单。 - **特点**: - 大大简化了网络应用的开发过程。 - 提供了丰富的功能组件,如TCP/UDP ...
Netty不仅封装了NIO的复杂性,还提供了更高级别的API,使得网络编程变得更加简单和高效。 Netty的主要特性包括: 1. **高度可定制**:Netty提供了大量的预定义编码器、解码器和处理器,可以方便地组合使用,也可以...
例如,C语言中的`printf`和`scanf`,以及Java中的面向对象封装。 2. **Java标准IO回顾** 在Java中,标准IO基于流(Stream)的概念,分为字节流和字符流两大类。字节流如`InputStream`和`OutputStream`处理原始字节...
下面是一个简单的NIO文件复制示例,演示如何使用Buffer和Channel来进行数据传输: ```java package sample; import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.ByteBuffer; ...
6. **结合工具和框架**:现代Java开发中,有许多优秀的库如Apache Commons IO、Google Guava等,它们提供了更高级别的IO操作,而Netty这样的网络框架则封装了NIO,使得编写高性能的网络应用变得更加容易。...
- 根据联通提供的SGIP1.2接口文档,封装协议,编写通信流程 - 下行短信(发送短信给手机用户),里面有两种方式发送短信,一种是Socket,一种是NIO,NIO的性能较高 - 上行短信(接收手机用户的短信),实际是接收联通短信中心...
总结来说,从NIO到Netty,我们看到的是从底层I/O模型到高级通信框架的发展,Netty通过封装复杂的网络编程细节,提供了高效、易用的解决方案,极大地简化了开发者的任务,提升了应用的性能和稳定性。
Buffer 是 NIO 中所使用的缓冲区,它不是一个简单的 byte 数组,而是封装过的 Buffer 类。通过 Buffer 提供的 API,我们可以灵活地操纵数据。NIO 提供了多种 Buffer 类型,如 ByteBuffer、CharBuffer、DoubleBuffer ...