- 浏览: 444506 次
- 性别:
- 来自: 苏州
文章分类
最新评论
-
danStart:
想问问,能监测服务是否挂掉吗?
公司要求实时监控服务器,写个Web的监控系统 -
hepct:
你好,最近在搭一个游戏服务器,能加好友请教下吗?1538863 ...
java游戏服务端实现 -
Limewwy:
没打完就发表了?为啥要这样设置?【游戏中需要传递用户的积分,这 ...
java游戏服务端实现 -
Limewwy:
楼主您好。请教为啥要这样设计?
java游戏服务端实现 -
3849801:
楼主,能够提供更具体的文档或者指导吗?我想搭建一个服务端,非常 ...
java游戏服务端实现
看到博客首页有一篇很好的原创文章,关于io和newio的用法和比较,自己收藏了,
1. 前言
现在很多做网络通讯中间代理层的通讯都是使用Java1.4以后推出的NIO进行编写,现在还有很多开源的框架也是封装了NIO的书写细节来帮助大家简写异步非阻塞通讯服务。像MySql的代理中间件amoeba-mysql-proxy就是采用NIO的方式处理client端过来的request,之后与Mysql-Server层的通讯也是采用NIO进行命令消息发送的。再看咱们JavaEye首页介绍的项目xmemcached,其中作者Dennis是其xmemcached的开发人,他也是通过NIO的方式与memcached的Server进行异步通讯的,Dennis的另一个项目yanf4j就是一个NIO框架,xmemcache也是借助这个NIO框架实现的异步非阻塞方式的网络通讯,Apache的MINA框架都是NIO的封装再实现。
那么我们就来回顾一下以前的处理方式,来看看为什么现在要使用NIO来进行异步非阻塞方式的通讯吧,网上很多文章都是几句话将NIO和原始的socket通讯的优劣一带而过,我们这次用一个简单的下载大文件的网络服务程序进行说明。使用3种模式来说明,分别是同步单独线程服务运行模式、传统阻塞多线程模式、使用NIO异步非阻塞模式。
我们设置服务器上有一个1.81GB的电影,格式为RMVB。使用Server进行服务监听,客户端请求到Server,建立网络通讯,进行电影下载。
2. 同步单线程阻塞
使用同步单线程下载,是最原始的socket通讯,服务端的代码如下
- <SPAN style="FONT-SIZE: x-small">package server;
- import java.io.FileInputStream;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.ServerSocket;
- import java.net.Socket;
- /**
- * liuyan
- */
- public class FilmServer {
- public static void main(String[] args) {
- FilmServer ms = new FilmServer();
- try {
- ms.server();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 服务器端响应请求
- *
- * @throws Exception
- */
- public void server() throws Exception {
- // 0.建立服务器端的server的socket
- ServerSocket ss = new ServerSocket(9999);
- while (true) {
- // 1.打开socket连接
- // 等待客户端的请求
- final Socket server = ss.accept();
- System.out.println("服务-----------请求开始start");
- // 2.打开socket的流信息,准备下面的操作
- final InputStream is = server.getInputStream();
- byte b[] = new byte[1024];
- int readCount = is.read(b);
- String str = new String(b);
- str = str.trim();
- final String serverFileName = str;
- // 3.对流信息进行读写操作
- System.out.println("客户端传过来的信息是:" + str);
- System.out.println("线程" + Thread.currentThread().getName() + "启动");
- try {
- FileInputStream fileInputStream = new FileInputStream(
- serverFileName);
- // 3.1 服务器回复客户端信息(response)
- OutputStream os = server.getOutputStream();
- byte[] bfile = new byte[1024];
- // 往客户端写
- while (fileInputStream.read(bfile) > 0) {
- os.write(bfile);
- }
- fileInputStream.close();
- os.close();
- // 4.关闭socket
- // 先关闭输入流
- is.close();
- // 最后关闭socket
- server.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- System.out.println("服务-----------请求结束over");
- }
- }
- }</SPAN>
package server;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* liuyan
*/
public class FilmServer {
public static void main(String[] args) {
FilmServer ms = new FilmServer();
try {
ms.server();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务器端响应请求
*
* @throws Exception
*/
public void server() throws Exception {
// 0.建立服务器端的server的socket
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 1.打开socket连接
// 等待客户端的请求
final Socket server = ss.accept();
System.out.println("服务-----------请求开始start");
// 2.打开socket的流信息,准备下面的操作
final InputStream is = server.getInputStream();
byte b[] = new byte[1024];
int readCount = is.read(b);
String str = new String(b);
str = str.trim();
final String serverFileName = str;
// 3.对流信息进行读写操作
System.out.println("客户端传过来的信息是:" + str);
System.out.println("线程" + Thread.currentThread().getName() + "启动");
try {
FileInputStream fileInputStream = new FileInputStream(
serverFileName);
// 3.1 服务器回复客户端信息(response)
OutputStream os = server.getOutputStream();
byte[] bfile = new byte[1024];
// 往客户端写
while (fileInputStream.read(bfile) > 0) {
os.write(bfile);
}
fileInputStream.close();
os.close();
// 4.关闭socket
// 先关闭输入流
is.close();
// 最后关闭socket
server.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("服务-----------请求结束over");
}
}
}
服务端这么写代码会有什么问题?咱们先来看客户端代码,之后运行后就知道了。
- <SPAN style="FONT-SIZE: x-small">package client;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.Socket;
- import java.net.UnknownHostException;
- /**
- * liuyan
- * @version 1.0
- */
- public class FilmClient {
- public static void main(String[] args) {
- for (int i = 1; i <= 2; i++) {
- Client client = new Client();
- client.i = i;
- client.start();
- }
- }
- }
- class Client extends Thread {
- int i;
- @Override
- public void run() {
- // 1.建立scoket连接
- Socket client;
- try {
- client = new Socket("127.0.0.1", 9999);
- // 2.打开socket的流信息,准备下面的操作
- OutputStream os = client.getOutputStream();
- // 3.写信息
- os.write(("d://film//2.rmvb").getBytes());
- String filmName = "c://io"+i+".rmvb";
- FileOutputStream fileOutputStream = new FileOutputStream(filmName);
- // 3.1接收服务器端的反馈
- InputStream is = client.getInputStream();
- byte b[] = new byte[1024];
- while(is.read(b)>0){
- fileOutputStream.write(b);
- }
- // 4.关闭socket
- // 先关闭输出流
- os.close();
- // 最后关闭socket
- client.close();
- } catch (UnknownHostException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }</SPAN>
package client;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* liuyan
* @version 1.0
*/
public class FilmClient {
public static void main(String[] args) {
for (int i = 1; i <= 2; i++) {
Client client = new Client();
client.i = i;
client.start();
}
}
}
class Client extends Thread {
int i;
@Override
public void run() {
// 1.建立scoket连接
Socket client;
try {
client = new Socket("127.0.0.1", 9999);
// 2.打开socket的流信息,准备下面的操作
OutputStream os = client.getOutputStream();
// 3.写信息
os.write(("d://film//2.rmvb").getBytes());
String filmName = "c://io"+i+".rmvb";
FileOutputStream fileOutputStream = new FileOutputStream(filmName);
// 3.1接收服务器端的反馈
InputStream is = client.getInputStream();
byte b[] = new byte[1024];
while(is.read(b)>0){
fileOutputStream.write(b);
}
// 4.关闭socket
// 先关闭输出流
os.close();
// 最后关闭socket
client.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客户端启动了2个线程进行下载电影的工作,先启动服务端,再运行客户端,会看笔者本地的硬盘C分区到有如下效果。
可以看到线程2的下载任务一直是0字节,等第一个线程下载完成后呢,线程2的下载任务才能进行。
服务端的代码造成的问题就是使用传统的sokect网络通讯,那么另一个客户端的线程请求到server端的时候就发生了阻塞的情况,也就是说,服务端相当一个厕所,厕所就有只有一个坑位,来了一个人,相当于客户端请求,那这个人相当于就把坑位给占了,write操作和read操作会阻塞,这个人还没解决完问题呢,下个人就来了,没办法,哥们儿先在门外等等啊,等前一个客户爽完了再给您提供服务好吧。那么如何解决这个占着坑位不让别人用的情况呢?
3. 阻塞的多线程
为了解决以上问题,那么之后很多Server肯定不可能像以上程序那么做,不过以前很多Server都是基于单线程服务改造一下,做成多线程的Server的通讯,修改一下上面的Server代码,如下
- <SPAN style="FONT-SIZE: x-small">package server;
- import java.io.FileInputStream;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.ServerSocket;
- import java.net.Socket;
- /**
- *
- */
- public class FilmServerNewThread {
- public static void main(String[] args) {
- FilmServerNewThread ms = new FilmServerNewThread();
- try {
- ms.server();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 服务器端响应请求
- *
- * @throws Exception
- */
- public void server() throws Exception {
- // 0.建立服务器端的server的socket
- ServerSocket ss = new ServerSocket(9999);
- while (true) {
- // 1.打开socket连接
- // 等待客户端的请求
- final Socket server = ss.accept();
- System.out.println("服务-----------请求开始start");
- // 2.打开socket的流信息,准备下面的操作
- final InputStream is = server.getInputStream();
- byte b[] = new byte[1024];
- int readCount = is.read(b);
- String str = new String(b);
- str = str.trim();
- final String serverFileName = str;
- // 3.对流信息进行读写操作
- System.out.println("客户端传过来的信息是:" + str);
- if (readCount > 0) {
- new Thread() {
- @Override
- public void run() {
- System.out.println("线程"
- + Thread.currentThread().getName() + "启动");
- try {
- FileInputStream fileInputStream = new FileInputStream(
- serverFileName);
- // 3.1 服务器回复客户端信息(response)
- OutputStream os = server.getOutputStream();
- byte[] bfile = new byte[1024];
- // 往客户端写
- while (fileInputStream.read(bfile) > 0) {
- os.write(bfile);
- }
- fileInputStream.close();
- os.close();
- // 4.关闭socket
- // 先关闭输入流
- is.close();
- // 最后关闭socket
- server.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }.start();
- }
- System.out.println("服务-----------请求结束over");
- }
- }
- }</SPAN>
package server;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
*
*/
public class FilmServerNewThread {
public static void main(String[] args) {
FilmServerNewThread ms = new FilmServerNewThread();
try {
ms.server();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务器端响应请求
*
* @throws Exception
*/
public void server() throws Exception {
// 0.建立服务器端的server的socket
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 1.打开socket连接
// 等待客户端的请求
final Socket server = ss.accept();
System.out.println("服务-----------请求开始start");
// 2.打开socket的流信息,准备下面的操作
final InputStream is = server.getInputStream();
byte b[] = new byte[1024];
int readCount = is.read(b);
String str = new String(b);
str = str.trim();
final String serverFileName = str;
// 3.对流信息进行读写操作
System.out.println("客户端传过来的信息是:" + str);
if (readCount > 0) {
new Thread() {
@Override
public void run() {
System.out.println("线程"
+ Thread.currentThread().getName() + "启动");
try {
FileInputStream fileInputStream = new FileInputStream(
serverFileName);
// 3.1 服务器回复客户端信息(response)
OutputStream os = server.getOutputStream();
byte[] bfile = new byte[1024];
// 往客户端写
while (fileInputStream.read(bfile) > 0) {
os.write(bfile);
}
fileInputStream.close();
os.close();
// 4.关闭socket
// 先关闭输入流
is.close();
// 最后关闭socket
server.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}.start();
}
System.out.println("服务-----------请求结束over");
}
}
}
以上的Server就是在原始的socket基础上加了线程,每一个Client请求过来后,整个Server主线程不必处于阻塞状态,接收请求后直接另起一个新的线程来处理和客户端的交互,就是往客户端发送二进制包。这个在新线程中虽然阻塞,但是对于服务主线程没有阻塞的影响,主线程依然通过死循环监听着客户端的一举一动。另一个客户端的线程发起请求后就再起一个新的线程对象去为客户端服务。执行效果如下
2个线程互不影响,各自下载各自的。当然从非常严格的意义来讲,str变量在十分高并发的情况下有线程安全问题,这个咱暂且忽略,就着眼于低并发的情况。这个问题是什么呢,就是如果客户端请求比较多了,那么为每一个客户端开辟一个新的线程对象来处理网络传输的请求,需要创建个线程对象,而且这个线程对象从时间上来讲还是处于长连接,这个就比较消费系统资源,这个打开进程管理器就可以看到。而且每一个线程内部都是阻塞的,也没有说完全利用好这个新创建的线程。还拿刚才上厕所举例子,好比现在不止一个坑位了,来了一个用户我这边就按照工程师的厕所坑位图建立一个新的坑位,客户来了,不用等待老坑位,用新创建的坑位就行了。等那个老坑位用完了,自然有垃圾回收器去消灭那个一次性的坑位的,腾出资源位置为了建立新的坑位。长时间连接的意思,相当于这个人上厕所的时间非常长,便秘??需要拉一天才能爽完……老的坑位一时半会儿回收不了,新的坑位需要有空间为其建造茅房以便满足客户端的“急切方便”需要。久而久之,线程数目一多,系统就挂了的概率就增多了(谁也别想上,全玩完了)。
4. 异步非阻塞
使用JDK1.4的NIO可以适当的解决上面的问题,异步 I/O 是一种 没有阻塞地读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。异步 I/O 调用不会阻塞。相反,您将注册对特定 I/O 事件的兴趣 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉您。异步 I/O 的一个优势在于,它允许您同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。还是举上公共厕所例子,虽然这个例子有点臭臭的。您现在有“便便”的需求了,不用那么麻烦,看看公共厕所是否有人占领,也不用给您另起个新坑位,您就拿一根我们服务端定制的容器和一个很粗管子,这个坐便器的大小因您那个地方的尺寸而定,坐便器往您的那个地方一放,再将坐便器和管子一连接,OK,您就敞开了“爽”吧。不用担心,这个管子自然会连接到相应的肥料厂家,将您的排泄物有效回收加以利用的。您完了事,擦擦屁股,关上管子该干嘛还干嘛就行了。另一个人也有这个需求,没问题,每个要我们提供服务的人都用这根管子,和自己的坐便器就行了,管子很粗,谁来连这个管子都行,有多少都行啊。下面我们来看基于NIO的网络下载程序
- <SPAN style="FONT-SIZE: x-small">package server;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- import java.nio.channels.FileChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.Charset;
- import java.nio.charset.CharsetDecoder;
- import java.util.Iterator;
- /**
- *
- * @author liuyan
- *
- */
- public class NIOServer {
- static int BLOCK = 500*1024;
- /**
- * 处理客户端的内部类,专门负责处理与用户的交互
- */
- public class HandleClient {
- protected FileChannel channel;
- protected ByteBuffer buffer;
- String filePath;
- /**
- * 构造函数,文件的管道初始化
- * @param filePath
- * @throws IOException
- */
- public HandleClient(String filePath) throws IOException {
- //文件的管道
- this.channel = new FileInputStream(filePath).getChannel();
- //建立缓存
- this.buffer = ByteBuffer.allocate(BLOCK);
- this.filePath = filePath;
- }
- /**
- * 读取文件管道中数据到缓存中
- * @return
- */
- public ByteBuffer readBlock() {
- try {
- //清除缓冲区的内容,posistion设置为0,limit设置为缓冲的容量大小capacity
- buffer.clear();
- //读取
- int count = channel.read(buffer);
- //将缓存的中的posistion设置为0,将缓存中的limit设置在原始posistion位置上
- buffer.flip();
- if (count <= 0)
- return null;
- } catch (IOException e) {
- e.printStackTrace();
- }
- return buffer;
- }
- /**
- * 关闭服务端的文件管道
- */
- public void close() {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- protected Selector selector;
- protected String filename = "d:\\film\\60.rmvb"; // a big file
- protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
- protected CharsetDecoder decoder;
- //构造服务端口,服务管道等等
- public NIOServer(int port) throws IOException {
- selector = this.getSelector(port);
- Charset charset = Charset.forName("GB2312");
- decoder = charset.newDecoder();
- }
- // 获取Selector
- //构造服务端口,服务管道等等
- protected Selector getSelector(int port) throws IOException {
- ServerSocketChannel server = ServerSocketChannel.open();
- Selector sel = Selector.open();
- server.socket().bind(new InetSocketAddress(port));
- server.configureBlocking(false);
- //刚开始就注册链接事件
- server.register(sel, SelectionKey.OP_ACCEPT);
- return sel;
- }
- // 服务启动的开始入口
- public void listen() {
- try {
- for (;;) {
- //?
- selector.select();
- Iterator<SelectionKey> iter = selector.selectedKeys()
- .iterator();
- while (iter.hasNext()) {//首先是最先感兴趣的连接事件
- SelectionKey key = iter.next();
- //
- iter.remove();
- //处理事件
- handleKey(key);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- // 处理事件
- protected void handleKey(SelectionKey key) throws IOException {
- if (key.isAcceptable()) { // 接收请求
- //允许网络连接事件
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel = server.accept();
- channel.configureBlocking(false);
- //网络管道准备处理读事件
- channel.register(selector, SelectionKey.OP_READ);
- } else if (key.isReadable()) { // 读信息
- SocketChannel channel = (SocketChannel) key.channel();
- //从客户端读过来的数据块
- int count = channel.read(clientBuffer);
- if (count > 0) {
- //读取过来的缓存进行有效分割,posistion设置为0,保证从缓存的有效位置开始读取,limit设置为原先的posistion上
- //这样一来从posistion~limit这段缓存数据是有效,可利用的
- clientBuffer.flip();
- //对客户端缓存块进行编码
- CharBuffer charBuffer = decoder.decode(clientBuffer);
- System.out.println("Client >>download>>" + charBuffer.toString());
- //对网络管道注册写事件
- SelectionKey wKey = channel.register(selector,
- SelectionKey.OP_WRITE);
- //将网络管道附着上一个处理类HandleClient,用于处理客户端事件的类
- wKey.attach(new HandleClient(charBuffer.toString()));
- } else{
- //如客户端没有可读事件,关闭管道
- channel.close();
- }
- clientBuffer.clear();
- } else if (key.isWritable()) { // 写事件
- SocketChannel channel = (SocketChannel) key.channel();
- //从管道中将附着处理类对象HandleClient取出来
- HandleClient handle = (HandleClient) key.attachment();
- //读取文件管道,返回数据缓存
- ByteBuffer block = handle.readBlock();
- if (block != null){
- //System.out.println("---"+new String(block.array()));
- //写给客户端
- channel.write(block);
- }else {
- handle.close();
- channel.close();
- }
- }
- }
- public static void main(String[] args) {
- int port = 12345;
- try {
- NIOServer server = new NIOServer(port);
- System.out.println("Listernint on " + port);
- while (true) {
- server.listen();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }</SPAN>
package server;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
/**
*
* @author liuyan
*
*/
public class NIOServer {
static int BLOCK = 500*1024;
/**
* 处理客户端的内部类,专门负责处理与用户的交互
*/
public class HandleClient {
protected FileChannel channel;
protected ByteBuffer buffer;
String filePath;
/**
* 构造函数,文件的管道初始化
* @param filePath
* @throws IOException
*/
public HandleClient(String filePath) throws IOException {
//文件的管道
this.channel = new FileInputStream(filePath).getChannel();
//建立缓存
this.buffer = ByteBuffer.allocate(BLOCK);
this.filePath = filePath;
}
/**
* 读取文件管道中数据到缓存中
* @return
*/
public ByteBuffer readBlock() {
try {
//清除缓冲区的内容,posistion设置为0,limit设置为缓冲的容量大小capacity
buffer.clear();
//读取
int count = channel.read(buffer);
//将缓存的中的posistion设置为0,将缓存中的limit设置在原始posistion位置上
buffer.flip();
if (count <= 0)
return null;
} catch (IOException e) {
e.printStackTrace();
}
return buffer;
}
/**
* 关闭服务端的文件管道
*/
public void close() {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
protected Selector selector;
protected String filename = "d:\\film\\60.rmvb"; // a big file
protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);
protected CharsetDecoder decoder;
//构造服务端口,服务管道等等
public NIOServer(int port) throws IOException {
selector = this.getSelector(port);
Charset charset = Charset.forName("GB2312");
decoder = charset.newDecoder();
}
// 获取Selector
//构造服务端口,服务管道等等
protected Selector getSelector(int port) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
Selector sel = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
//刚开始就注册链接事件
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}
// 服务启动的开始入口
public void listen() {
try {
for (;;) {
//?
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator();
while (iter.hasNext()) {//首先是最先感兴趣的连接事件
SelectionKey key = iter.next();
//
iter.remove();
//处理事件
handleKey(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 处理事件
protected void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) { // 接收请求
//允许网络连接事件
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
//网络管道准备处理读事件
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 读信息
SocketChannel channel = (SocketChannel) key.channel();
//从客户端读过来的数据块
int count = channel.read(clientBuffer);
if (count > 0) {
//读取过来的缓存进行有效分割,posistion设置为0,保证从缓存的有效位置开始读取,limit设置为原先的posistion上
//这样一来从posistion~limit这段缓存数据是有效,可利用的
clientBuffer.flip();
//对客户端缓存块进行编码
CharBuffer charBuffer = decoder.decode(clientBuffer);
System.out.println("Client >>download>>" + charBuffer.toString());
//对网络管道注册写事件
SelectionKey wKey = channel.register(selector,
SelectionKey.OP_WRITE);
//将网络管道附着上一个处理类HandleClient,用于处理客户端事件的类
wKey.attach(new HandleClient(charBuffer.toString()));
} else{
//如客户端没有可读事件,关闭管道
channel.close();
}
clientBuffer.clear();
} else if (key.isWritable()) { // 写事件
SocketChannel channel = (SocketChannel) key.channel();
//从管道中将附着处理类对象HandleClient取出来
HandleClient handle = (HandleClient) key.attachment();
//读取文件管道,返回数据缓存
ByteBuffer block = handle.readBlock();
if (block != null){
//System.out.println("---"+new String(block.array()));
//写给客户端
channel.write(block);
}else {
handle.close();
channel.close();
}
}
}
public static void main(String[] args) {
int port = 12345;
try {
NIOServer server = new NIOServer(port);
System.out.println("Listernint on " + port);
while (true) {
server.listen();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
ServerSocketChannel相当于我们说的那个大粗管子,在它上面注册了很多这个管子感兴趣的事件,比如大便、小便、酒醉后吐的污浊都是它关心的。至于谁来控制管道应该关心的事件,是由管道通过Selector注册事件完成的,Selector相当于一个大管道的维护员了。管道必须得有服务商的厂家维护吧,不能滥用吧。Selector就是个管家,负责管道的事件监听的。XXXXBuffer相当于咱们说的坐便器,它是以块为单位进行管道疏通的,假如您的尺寸特别大,估计您排出的那个玩意也小不了,就配置一个大点的缓存传给服务那边,当然,您这边得到的服务端返回的加工后肥料,返给您的也是和您配置的尺寸有关系的。客户端的代码如下
- <SPAN style="FONT-SIZE: x-small">package client;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.CharBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.Charset;
- import java.nio.charset.CharsetEncoder;
- import java.util.Iterator;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- *
- * @author liuyan
- *
- */
- public class NIOClient {
- static int SIZE = 2;
- final static int bufferSize = 500 * 1024;
- static InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
- static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
- static class Download implements Runnable {
- protected int index;
- String outfile = null;
- public Download(int index) {
- this.index = index;
- this.outfile = "c:\\" + index + ".rmvb";
- }
- public void run() {
- FileOutputStream fout = null;
- // FileChannel fcout = null;
- try {
- fout = new FileOutputStream(outfile);
- // fcout = fout.getChannel();
- } catch (FileNotFoundException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
- try {
- long start = System.currentTimeMillis();
- // 打开客户端socket管道
- SocketChannel client = SocketChannel.open();
- // 客户端的管道的通讯模式
- client.configureBlocking(false);
- // 选择器
- Selector selector = Selector.open();
- // 往客户端管道上注册感兴趣的连接事件
- client.register(selector, SelectionKey.OP_CONNECT);
- // 配置IP
- client.connect(ip);
- // 配置缓存大小
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- int total = 0;
- FOR: for (;;) {
- // 阻塞,返回发生感兴趣事件的数量
- selector.select();
- // 相当于获得感兴趣事件的集合迭代
- Iterator<SelectionKey> iter = selector.selectedKeys()
- .iterator();
- while (iter.hasNext()) {
- SelectionKey key = iter.next();
- System.out.println("-----Thread "
- + index + "------------------"+key.readyOps());
- // 删除这个马上就要被处理的事件
- iter.remove();
- // 感兴趣的是可连接的事件
- if (key.isConnectable()) {
- // 获得该事件中的管道对象
- SocketChannel channel = (SocketChannel) key
- .channel();
- // 如果该管道对象已经连接好了
- if (channel.isConnectionPending())
- channel.finishConnect();
- // 往管道中写一些块信息
- channel.write(encoder.encode(CharBuffer
- .wrap("d://film//1.rmvb")));
- // 之后为该客户端管道注册新的感兴趣的事件---读操作
- channel.register(selector, SelectionKey.OP_READ);
- } else if (key.isReadable()) {
- // 由事件获得通讯管道
- SocketChannel channel = (SocketChannel) key
- .channel();
- // 从管道中读取数据放到缓存中
- int count = channel.read(buffer);
- System.out.println("count:" + count);
- if (count > 0) {
- // 统计读取的字节数目
- total += count;
- // 这样一来从posistion~limit这段缓存数据是有效,可利用的
- // buffer.flip();
- buffer.clear();
- // 往输出文件中去写了
- if (count < bufferSize) {
- byte[] overByte = new byte[count];
- for (int index = 0; index < count; index++) {
- overByte[index] = buffer.get(index);
- }
- fout.write(overByte);
- } else {
- fout.write(buffer.array());
- }
- } else {
- // 关闭客户端通道
- client.close();
- // 退出大循环
- break FOR;
- }
- }
- }
- }
- // 计算时间
- double last = (System.currentTimeMillis() - start) * 1.0 / 1000;
- System.out.println("Thread " + index + " downloaded " + total
- / 1024 + "kbytes in " + last + "s.");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args) throws IOException {
- long startTime = System.currentTimeMillis();
- // 启用线程池
- ExecutorService exec = Executors.newFixedThreadPool(SIZE);
- for (int index = 1; index <= SIZE; index++) {
- exec.execute(new Download(index));
- }
- exec.shutdown();
- long endTime = System.currentTimeMillis();
- long timeLong = endTime - startTime;
- System.out.println("下载时间:" + timeLong);
- }
- }</SPAN>
package client;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author liuyan
*
*/
public class NIOClient {
static int SIZE = 2;
final static int bufferSize = 500 * 1024;
static InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();
static class Download implements Runnable {
protected int index;
String outfile = null;
public Download(int index) {
this.index = index;
this.outfile = "c:\\" + index + ".rmvb";
}
public void run() {
FileOutputStream fout = null;
// FileChannel fcout = null;
try {
fout = new FileOutputStream(outfile);
// fcout = fout.getChannel();
} catch (FileNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
long start = System.currentTimeMillis();
// 打开客户端socket管道
SocketChannel client = SocketChannel.open();
// 客户端的管道的通讯模式
client.configureBlocking(false);
// 选择器
Selector selector = Selector.open();
// 往客户端管道上注册感兴趣的连接事件
client.register(selector, SelectionKey.OP_CONNECT);
// 配置IP
client.connect(ip);
// 配置缓存大小
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int total = 0;
FOR: for (;;) {
// 阻塞,返回发生感兴趣事件的数量
selector.select();
// 相当于获得感兴趣事件的集合迭代
Iterator<SelectionKey> iter = selector.selectedKeys()
.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
System.out.println("-----Thread "
+ index + "------------------"+key.readyOps());
// 删除这个马上就要被处理的事件
iter.remove();
// 感兴趣的是可连接的事件
if (key.isConnectable()) {
// 获得该事件中的管道对象
SocketChannel channel = (SocketChannel) key
.channel();
// 如果该管道对象已经连接好了
if (channel.isConnectionPending())
channel.finishConnect();
// 往管道中写一些块信息
channel.write(encoder.encode(CharBuffer
.wrap("d://film//1.rmvb")));
// 之后为该客户端管道注册新的感兴趣的事件---读操作
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 由事件获得通讯管道
SocketChannel channel = (SocketChannel) key
.channel();
// 从管道中读取数据放到缓存中
int count = channel.read(buffer);
System.out.println("count:" + count);
if (count > 0) {
// 统计读取的字节数目
total += count;
// 这样一来从posistion~limit这段缓存数据是有效,可利用的
// buffer.flip();
buffer.clear();
// 往输出文件中去写了
if (count < bufferSize) {
byte[] overByte = new byte[count];
for (int index = 0; index < count; index++) {
overByte[index] = buffer.get(index);
}
fout.write(overByte);
} else {
fout.write(buffer.array());
}
} else {
// 关闭客户端通道
client.close();
// 退出大循环
break FOR;
}
}
}
}
// 计算时间
double last = (System.currentTimeMillis() - start) * 1.0 / 1000;
System.out.println("Thread " + index + " downloaded " + total
/ 1024 + "kbytes in " + last + "s.");
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
long startTime = System.currentTimeMillis();
// 启用线程池
ExecutorService exec = Executors.newFixedThreadPool(SIZE);
for (int index = 1; index <= SIZE; index++) {
exec.execute(new Download(index));
}
exec.shutdown();
long endTime = System.currentTimeMillis();
long timeLong = endTime - startTime;
System.out.println("下载时间:" + timeLong);
}
}
效果和上一个程序的效果差不多,只是时间上和内存资源占有率上有所提高,当然本机仅仅启动了几个线程,如果客户端启动更多线程,NIO的方式节约资源的效果是明显的,宕机概率小于阻塞IO方式很多。
5. 总结
其实NIO想写得更多,但是看到网络上已经有很多资料了,就不再展开了,非一篇所能尽述的了的。当然了,NIO也是有场景的,比如适合与长连接的请求,以为NIO维护管道、缓存块、时间选择器等等也需要资源消耗的,而且比传统IO的对象们要重量级。所以原始IO也并不是一无是处,现在还是有很多socket中间件还是已然使用第二种方式。
代码在附件中~~~
相关推荐
在.NET平台上,SuperSocket以其易于扩展和高度定制化的特性,成为了Socket多线程编程的一个优选方案。 **一、SuperSocket的核心特性** 1. **多线程支持**:SuperSocket采用了多线程模型来处理客户端的连接,确保了...
在IT领域,网络编程是不可或缺的一部分,而Socket和多线程技术则是构建网络应用程序的核心工具。本示例项目"SOCKET+多线程例子"旨在帮助初学者掌握这两个关键概念,以便于创建高效的网络通信应用。 首先,让我们...
《基于SOCKET和多线程的应用程序间通信技术的研究》一文深入探讨了在不同程序之间采用SOCKET和多线程技术进行数据通信的方法。本文将详细解析标题和描述中的核心知识点,包括SOCKET和多线程的基本原理,以及它们在...
在计算机网络编程中,Socket是实现进程间通信(IPC)的一种方式,特别是在互联网环境中,它提供了...对这个程序进行深入学习和分析,可以帮助你理解多线程Socket服务端的工作原理,并为构建自己的并发服务器提供基础。
本项目标题为“Qt Socket 多线程代码实现”,是一个适合初学者的示例,它展示了如何使用QtcpSocket在多线程环境中构建一个服务器。下面将详细介绍相关知识点。 首先,我们来理解什么是Socket。Socket是一种在不同...
1、运用多线程和Socket技术实现Socket Server端侦听多个客户端请求; 2、实现服务器端循环处理客户端不同请求从而实现不同测试要求,并向客户端循环发送数据; 3、实现客户端向服务器端发送不同测试命令,并接收...
多线程实现Socket通信 多线程实现Socket通信 多线程实现Socket通信
Socket编程和多线程技术是计算机网络编程中的两个重要概念,尤其在开发实时通信应用,如聊天室时,它们起着关键作用。本项目“Socket和多线程的聊天室”是一个基于控制台的简单聊天应用程序,允许用户通过命令行进行...
在C++编程中,多线程SOCKET收发是一项重要的技术,它允许程序同时处理多个网络连接,提高系统的并发性能。下面将详细讲解这个主题,包括C++中的多线程概念、SOCKET基础以及如何结合两者实现数据的收发。 首先,让...
在这个"socket多线程例程非阻塞模式"的示例中,我们将深入探讨如何在Windows平台上实现多线程的socket通信,并了解非阻塞模式的工作原理。 首先,让我们来理解Socket的基本概念。Socket是进程间通信(IPC)的一种...
标题提到的"SOCKET实现多线程TCP连接源码"是一个典型的网络编程示例,它演示了如何通过Socket接口处理多个并发客户端连接,并使用多线程来保持与每个客户端的独立通信。 首先,TCP(传输控制协议)是一种面向连接的...
### Socket和多线程在视频传输中的应用 #### 一、引言 随着计算机网络技术和多媒体技术的迅速发展,视频应用系统(例如视频电话、视频监控等)的应用范围日益广泛。如何有效地在网络上传输视频信息成为了研究的...
Java 多线程-Socket 编程 Java 多线程-Socket 编程是指在 Java 语言中使用多线程技术来实现网络编程,特别是使用 Socket 编程来实现客户端和服务器端的通信。在 Java 中,多线程可以使用 Thread 类和 Runnable 接口...
总结来说,Java Socket多线程是构建高并发网络服务的关键技术。通过合理地设计和实现,可以有效地提升服务器的并发处理能力,为用户提供更高效的服务。在实际项目中,应根据具体需求选择适合的线程模型,例如线程池...
标题"Netty技术文档,Socket技术,多线程"指出我们要讨论的是Netty框架,它与Socket编程以及多线程技术的结合。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端...
Java Socket多线程简易实现是网络编程中的一个基础概念,主要应用于客户端与服务器之间的通信。在Java中,Socket是基于TCP协议的,提供了一种可靠的、面向连接的字节流通信方式。多线程则是Java并发编程的重要特性,...
Socket通讯与多线程处理是计算机网络编程中的重要概念,主要应用于分布式系统、服务器开发以及实时数据传输等场景。在本实例中,我们将探讨如何利用Socket进行通信,并结合多线程技术来提升程序的并发处理能力。 ...
本文将深入探讨基于Visual C++的TCP多线程客户端-服务器结构,并以"vc socket tcp 多线程客户端--服务器结构的例子"为例进行解析。这个例子包含了一个名为"RawSocketServerExample"的文件,很可能是实现此架构的源...
本教程将深入探讨如何使用C++来实现基于Socket的多线程通信,特别是在服务端利用多线程来处理并发连接。 首先,我们需要理解Socket的基本概念。Socket可以看作是两台计算机之间的通信端点,它允许数据在网络中传输...
本篇文章将深入探讨C#中基于TCP的Socket多线程通信,包括服务端和客户端的实现。 TCP(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议。在C#中,我们可以使用System.Net....