- 浏览: 93646 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
wenjia:
没有jar文件啊
springmvc+spring+jdbc(纯净版整合) -
雪之痕:
你好,能看下你测试用的代码么?
Java版的各种Thrift server实现的比较 -
liyonghui160com:
selectorThreads和workerThreads有设 ...
thift 服务端模型之TThreadedSelectorServer分析-server层 -
wuhuajun:
代理 包装了一层 通过反射调用目标对象所以如果目标对象的类 ...
jdk代理,cgib代理和spring后处理bean的关系 -
jd2bs:
这个跟具体实现相关 jdk proxy是动态生成$Proxy0 ...
jdk代理,cgib代理和spring后处理bean的关系
1、前言
在前一篇文章中,介绍了基于 BlockingIO +thread-per-connection 的方案,由于该方案为每一个连接分配一个线程,而线程里的大部分操作都是阻塞式的,所以在高并发的情况下,会导致产生大量的线程,线程间的上下文切换会浪费大量的 CPU 时间,而且每个线程是需要占用堆栈空间的,所以服务器能分配的线程数量也是有限的,当客户端的并发访问量达到一定的数量级后,服务器的资源就会耗尽,可伸缩性差。
根据上面的分析,要提高网络服务器的可伸缩性,就必须解决两个问题:
服务端的线程数量不能按照客户端连接数的增大而成线性增长,但又必须能够并发的响应不断增加的客户端请求
线程里的操作逻辑不能是阻塞式的
因此, java1.4 引入了非阻塞式 NIO ( Non-blocking IO ) , 解决了问题 2 ;而采用基于异步事件通知的 reactor 模式则可以仅仅用一个线程来并发的为多个连接服务,这样就解决了问题 1
2、Reactor 模式
2.1 示例
首先举一个生活中的例子来比较 thread-per-connection和 reactor方案
某火车票售票厅,只有 1 个售票窗口工作。两个乘客 a 、 b 先后来购票,由于 a 先到,所以售票窗口先为 a 服务, b 只能排队
thread-per-connection :
乘客 a与售票窗口开始沟通时,就相当于在客户端(乘客 a)与服务端(售票厅)之间建立了一个 connection,服务端为每一个 connection分配一个 thread(售票窗口)。当没有 thread可以分配时,后续的客户端请求(乘客 b)就不能及时响应了,所以 b只能排队。假设存在这种场景,售票窗口的服务员告诉乘客 a票价后,乘客 a准备付款时发现自己忘记了带钱包,所以乘客a打电话给家里人让他们把钱包送过来,但从 a的家步行到售票厅需要 5分钟,于是售票窗口的服务员就一直等着(被阻塞),但又不为乘客 b服务,因为她的做事风格( thread-per-connection)是一定要为一个乘客完完整整服务完后才能接着服务下一位乘客。
这种情况下,乘客 b 肯定会抱怨,而且 5 分钟后, b 的后面也肯定排了很多人,售票厅发现这种情况后,就只能选择再打开一个售票窗口(分配一个 thread )为 b 服务,但 b 后面的人也只能排队。之前那个窗口的服务员一直等着,又不干活,但工资还是照样拿,所以售票厅(服务端)的开销很大。
Reactor
服务员在等待 a 取钱包的过程中,被通知乘客 b 要求服务,所以窗口和 b 建立连接,悲剧的是 b 也没有带钱包,需要家里人送来。此时服务员又被通知 a 的钱包送过来了,所以窗口接着为 a 服务,出票完成后,服务员又被通知 b 的钱包送过来了,所以接着又为 b 服务。这样,售票厅(服务端)的开销就小了,现在只需要一个窗口就可以搞定所有事情。
2.2 Reactor 模式的思想:分而治之 + 事件驱动
分而治之:
一个 connection里发生的完整的网络处理过程一般分为 accept、 read、 decode、compute、 encode、 send这几步。 Reactor将每个步骤映射为一个 task,服务端端的线程执行的最小逻辑单元不再是一次完整的网络处理过程,而是更小的 task,且采用非阻塞的执行方式;
事件驱动:
每个 task对应一个特定的事件,当 task准备就绪时,对应的事件通知就会发出。 Reactor收到事件后,分发给绑定了对应事件的 Handler执行 task。
下图描述了单线程版本的 reactor 模式结构图。
关键概念:
Reactor:负责响应事件,分发给绑定了该事件的 handler执行 task
Handler:绑定了某类事件,负责执行该事件对应的 task。
Acceptor : Handler 的一种,绑定了 connect 事件。它在系统启动的时候就已经绑定了connnect 事件,并注册到 reactor 中。当有客户端发起 connect 请求时, reactor 会收到 accept事件,然后分发给 acceptor 。 acceptor 首先会 accept 新的连接,然后新建一个 handler ,将其与该连接上得 read 事件绑定,并注册到 reactor 中。
2.3 基于 reactor 的网络交互
客户端连接服务器过程
1) 服务器将绑定了 accept事件的 Acceptor注册到 Reactor中,准备 accept新的 connection;
2) 服务器启动 Reactor的事件循环处理功能(注意:该循环会阻塞,直到收到事件)
3) 客户端 connect服务器
4) Reactor响应 accept事件,分发给 Acceptor, Acceptor 确定建立一个新的连接。
5) Acceptor创建一个 handler专门服务该 connection后续的请求;
6) Handler绑定该 connection的 read事件,并将自己注册到 Reactor中
服务器处理客户端请求过程
1) 客户端发送请求
2) 当客户端的请求数据到达服务器时, Reactor响应 read事件,分发给绑定了 read事件的handler(即上面第 6步创建的 handler)
3) Handler执行 task,读取客户端的请求数据(此处是非阻塞读,即如果当前操作会导致当前线程阻塞,当前操作会立即返回,并重复执行第 2、 3步,直到客户端的请求读取完毕。)
4) 解析客户端请求数据
5) 读取文件
6) Handler重新绑定 write事件
7) 当 connection可以开始 write的时候, Reactor响应 write事件,分发给绑定了 write事件的handler
8) Handler 执行 task ,向客户端发送文件(此处是非阻塞写,即如果当前操作会导致当前线程阻塞,当前操作会立即返回,并重复执行第 7 、 8 步,直到文件全部发送完毕。)
注意:上述两个过程都是在服务器的一个线程里完成的,该线程响应所有客户端的请求。譬如服务端在处理客户端 A 的请求时,如果在第 2 步 read 事件还没有就绪(或者在第 3 步读取数据的时候发生阻塞了),则该线程会立即重新回到客户端连接服务器过程的第 2 步(即事件循环处理),等待新的事件通知。如果此时客户端 B 请求连接,则该线程会响应 B 的连接请求,这样就实现了一个线程同时为多个连接服务的效果。
3、 代码示例
3.1 NIO的几个关键概念
Selector:
Reactor里的一个核心组成部分,通过调用 selector.select()方法,可以知道感兴趣的 IO事件里哪些已经 ready,该方法是阻塞的,直到有 IO事件 ready;通过调用 selector.selectedKeys()方法,可以获取到 selectionKey对象,这些对象关联有已经 ready的 IO事件。
SelectionKey:
当 selector注册一个 channel时,会产生一个该对象,譬如SelectionKey selectionKey = channel .register(selector, SelectionKey. OP_ACCEPT );它维护着 channel 、 selector 、 IO 事件、Handler 之间的关系。通过调用 attach 方法,可以绑定一个 handler ,譬如:selectionKey.attach(new Acceptor());
ServerSocketChannel:
类似于 ServerSocket,唯一的区别在于: ServerSocketChannel可以使用 selector,而且可以设置为非阻塞模式。
SocketChannel:
类似于 Socket,唯一的区别在于: SocketChannel可以使用 selector,而且可以设置为非阻塞模式。
ByteBuffer :数据缓冲器,是 NIO 里将数据移入移出 channel 的唯一方式
3.2 code
注:所有代码只用来作为原理的进一步阐述,不能用于生产环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
服务端代码如下(单线程版本)
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
/**
* @author jason
*
*/
public class NioServer implements Runnable {
private InetAddress hostAddress;
private int port;
private ServerSocketChannel serverChannel;
private Selector selector;
public NioServer(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
// 初始化selector,绑定服务端监听套接字、感兴趣事件及对应的handler
this.selector = initSelector();
}
public static void main(String[] args) {
try {
// 启动服务器
new Thread(new NioServer(null, 9090)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
// 选择事件已经ready的selectionKey,该方法是阻塞的,只有当至少存在selectionKey,或者wakeup方法被调用,或者当前线程被中断,才会返回
selector.select();
// 循环处理每一个事件
Iterator<SelectionKey> items = selector.selectedKeys()
.iterator();
while (items.hasNext()) {
SelectionKey key = (SelectionKey) items.next();
items.remove();
if (!key.isValid()) {
continue;
}
// 事件处理分发
dispatch(key);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 事件处理分发
*
* @param sk
* 已经ready的selectionKey
*/
private void dispatch(SelectionKey sk) {
// 获取绑定的handler
Runnable r = (Runnable) sk.attachment();
if (r != null) {
r.run();
}
}
/**
* 初始化selector,绑定服务端监听套接字、感兴趣事件及对应的handler
*
* @return
* @throws IOException
*/
private Selector initSelector() throws IOException {
// 创建一个selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// 创建并打开ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverChannel.configureBlocking(false);
// 绑定端口
serverChannel.socket().bind(new InetSocketAddress(hostAddress, port));
// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听客户端连接事件
SelectionKey selectionKey = serverChannel.register(socketSelector,
SelectionKey.OP_ACCEPT);
// 绑定handler
selectionKey.attach(new Acceptor());
return socketSelector;
}
/**
* 处理OP_ACCEPT事件的handler
*
*/
class Acceptor implements Runnable {
@Override
public void run() {
try {
accept();
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept() throws IOException {
System.out.println("connect");
// 建立连接
SocketChannel socketChannel = serverChannel.accept();
System.out.println("connected");
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 创建Handler,专门处理该连接后续发生的OP_READ和OP_WRITE事件
new Handler(selector, socketChannel);
}
}
}
handler代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* @author jason
*
*/
final class Handler implements Runnable {
final SocketChannel socketChannel;
final SelectionKey key;
static final int MAXIN = 8192, MAXOUT = 11240 * 1024;
ByteBuffer readBuffer = ByteBuffer.allocate(MAXIN);
ByteBuffer outBuffer = ByteBuffer.allocate(MAXOUT);
static final int READING = 0;
static final int SENDING = 1;
int state = READING;
Handler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听该连接上得read事件
this.key = socketChannel.register(selector, SelectionKey.OP_READ);
// 绑定handler
this.key.attach(this);
}
/**
* 处理write
*
* @throws IOException
*/
private void write() throws IOException {
socketChannel.write(outBuffer);
if (outBuffer.remaining() > 0) {
return;
}
state = READING;
key.interestOps(SelectionKey.OP_READ);
}
/**
* 处理read
*
* @throws IOException
*/
private void read() throws IOException {
readBuffer.clear();
int numRead;
try {
// 读取数据
numRead = socketChannel.read(readBuffer);
} catch (Exception e) {
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
socketChannel.close();
key.cancel();
return;
}
// 处理数据
process(numRead);
}
/**
* 处理数据
*
* @param numRead
*/
private void process(int numRead) {
byte[] dataCopy = new byte[numRead];
System.arraycopy(readBuffer.array(), 0, dataCopy, 0, numRead);
System.out.println(new String(dataCopy));
outBuffer = ByteBuffer.wrap(dataCopy);
state = SENDING;
// 设置Key的interest set为监听该连接上的write事件
key.interestOps(SelectionKey.OP_WRITE);
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
write();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package sampleNio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
/**
* @author jason
*
*/
public class NioClient implements Runnable {
private InetAddress hostAddress;
private int port;
private Selector selector;
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
private ByteBuffer outBuffer = ByteBuffer.wrap("nice to meet you"
.getBytes());
public NioClient(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
initSelector();
}
public static void main(String[] args) {
try {
NioClient client = new NioClient(
InetAddress.getByName("localhost"), 9090);
new Thread(client).start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
selector.select();
Iterator<?> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
finishConnection(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void initSelector() throws IOException {
// 创建一个selector
selector = SelectorProvider.provider().openSelector();
// 打开SocketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 连接指定IP和端口的地址
socketChannel
.connect(new InetSocketAddress(this.hostAddress, this.port));
// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听服务端已建立连接的事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void finishConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
// 判断连接是否建立成功,不成功会抛异常
socketChannel.finishConnect();
} catch (IOException e) {
key.cancel();
return;
}
// 设置Key的interest set为OP_WRITE事件
key.interestOps(SelectionKey.OP_WRITE);
}
/**
* 处理read
*
* @param key
* @throws IOException
*/
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (Exception e) {
key.cancel();
socketChannel.close();
return;
}
if (numRead == 1) {
System.out.println("close connection");
socketChannel.close();
key.cancel();
return;
}
// 处理响应
handleResponse(socketChannel, readBuffer.array(), numRead);
}
/**
* 处理响应
*
* @param socketChannel
* @param data
* @param numRead
* @throws IOException
*/
private void handleResponse(SocketChannel socketChannel, byte[] data,
int numRead) throws IOException {
byte[] rspData = new byte[numRead];
System.arraycopy(data, 0, rspData, 0, numRead);
System.out.println(new String(rspData));
socketChannel.close();
socketChannel.keyFor(selector).cancel();
}
/**
* 处理write
*
* @param key
* @throws IOException
*/
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(outBuffer);
if (outBuffer.remaining() > 0) {
return;
}
// 设置Key的interest set为OP_READ事件
key.interestOps(SelectionKey.OP_READ);
}
}
4、 Reactor 的其他实现方式
单线程版本的 Reactor 最大的优势是:不需要做并发控制,简化了实现。缺点是不能充分利用多核 CPU的优势,因为只有一个线程,该线程需要执行所有的操作: accept、 read、decode、 compute、 encode、 send,而其中的 decode、 compute、 encode如果很耗时,则该线程就不能及时的响应其他客户端的请求。
为了解决该问题,可以采用另外两种版本:
4.1 Worker threads:
Reactor所在的线程只需要专心的响应客户端的请求: accept、 read、 send。对数据的具体处理过程则交给另外的线程池。这样可以提高服务端对客户端的响应速度,但同时增加了复杂度,也没有充分利用到多核的优势,因为 reactor只有一个,譬如同一时刻只能 read一个客户端的请求数据。
4.2Multiple reactor threads :
采用多个 reactor ,每个 reactor 都在自己单独的线程里执行。如果是多核,则可以同时响应多个客户端的请求。( Netty 采用的是类似这种方式,boss线程池就是多个mainReactor,worker线程池就是多个subReactor)
5、总结
本文分析了基于 NIO和 Reactor模式的网络服务器设计方案,在后续的 blog中将结合 Netty进一步分析高性能网络服务器的设计。
在前一篇文章中,介绍了基于 BlockingIO +thread-per-connection 的方案,由于该方案为每一个连接分配一个线程,而线程里的大部分操作都是阻塞式的,所以在高并发的情况下,会导致产生大量的线程,线程间的上下文切换会浪费大量的 CPU 时间,而且每个线程是需要占用堆栈空间的,所以服务器能分配的线程数量也是有限的,当客户端的并发访问量达到一定的数量级后,服务器的资源就会耗尽,可伸缩性差。
根据上面的分析,要提高网络服务器的可伸缩性,就必须解决两个问题:
服务端的线程数量不能按照客户端连接数的增大而成线性增长,但又必须能够并发的响应不断增加的客户端请求
线程里的操作逻辑不能是阻塞式的
因此, java1.4 引入了非阻塞式 NIO ( Non-blocking IO ) , 解决了问题 2 ;而采用基于异步事件通知的 reactor 模式则可以仅仅用一个线程来并发的为多个连接服务,这样就解决了问题 1
2、Reactor 模式
2.1 示例
首先举一个生活中的例子来比较 thread-per-connection和 reactor方案
某火车票售票厅,只有 1 个售票窗口工作。两个乘客 a 、 b 先后来购票,由于 a 先到,所以售票窗口先为 a 服务, b 只能排队
thread-per-connection :
乘客 a与售票窗口开始沟通时,就相当于在客户端(乘客 a)与服务端(售票厅)之间建立了一个 connection,服务端为每一个 connection分配一个 thread(售票窗口)。当没有 thread可以分配时,后续的客户端请求(乘客 b)就不能及时响应了,所以 b只能排队。假设存在这种场景,售票窗口的服务员告诉乘客 a票价后,乘客 a准备付款时发现自己忘记了带钱包,所以乘客a打电话给家里人让他们把钱包送过来,但从 a的家步行到售票厅需要 5分钟,于是售票窗口的服务员就一直等着(被阻塞),但又不为乘客 b服务,因为她的做事风格( thread-per-connection)是一定要为一个乘客完完整整服务完后才能接着服务下一位乘客。
这种情况下,乘客 b 肯定会抱怨,而且 5 分钟后, b 的后面也肯定排了很多人,售票厅发现这种情况后,就只能选择再打开一个售票窗口(分配一个 thread )为 b 服务,但 b 后面的人也只能排队。之前那个窗口的服务员一直等着,又不干活,但工资还是照样拿,所以售票厅(服务端)的开销很大。
Reactor
服务员在等待 a 取钱包的过程中,被通知乘客 b 要求服务,所以窗口和 b 建立连接,悲剧的是 b 也没有带钱包,需要家里人送来。此时服务员又被通知 a 的钱包送过来了,所以窗口接着为 a 服务,出票完成后,服务员又被通知 b 的钱包送过来了,所以接着又为 b 服务。这样,售票厅(服务端)的开销就小了,现在只需要一个窗口就可以搞定所有事情。
2.2 Reactor 模式的思想:分而治之 + 事件驱动
分而治之:
一个 connection里发生的完整的网络处理过程一般分为 accept、 read、 decode、compute、 encode、 send这几步。 Reactor将每个步骤映射为一个 task,服务端端的线程执行的最小逻辑单元不再是一次完整的网络处理过程,而是更小的 task,且采用非阻塞的执行方式;
事件驱动:
每个 task对应一个特定的事件,当 task准备就绪时,对应的事件通知就会发出。 Reactor收到事件后,分发给绑定了对应事件的 Handler执行 task。
下图描述了单线程版本的 reactor 模式结构图。
关键概念:
Reactor:负责响应事件,分发给绑定了该事件的 handler执行 task
Handler:绑定了某类事件,负责执行该事件对应的 task。
Acceptor : Handler 的一种,绑定了 connect 事件。它在系统启动的时候就已经绑定了connnect 事件,并注册到 reactor 中。当有客户端发起 connect 请求时, reactor 会收到 accept事件,然后分发给 acceptor 。 acceptor 首先会 accept 新的连接,然后新建一个 handler ,将其与该连接上得 read 事件绑定,并注册到 reactor 中。
2.3 基于 reactor 的网络交互
客户端连接服务器过程
1) 服务器将绑定了 accept事件的 Acceptor注册到 Reactor中,准备 accept新的 connection;
2) 服务器启动 Reactor的事件循环处理功能(注意:该循环会阻塞,直到收到事件)
3) 客户端 connect服务器
4) Reactor响应 accept事件,分发给 Acceptor, Acceptor 确定建立一个新的连接。
5) Acceptor创建一个 handler专门服务该 connection后续的请求;
6) Handler绑定该 connection的 read事件,并将自己注册到 Reactor中
服务器处理客户端请求过程
1) 客户端发送请求
2) 当客户端的请求数据到达服务器时, Reactor响应 read事件,分发给绑定了 read事件的handler(即上面第 6步创建的 handler)
3) Handler执行 task,读取客户端的请求数据(此处是非阻塞读,即如果当前操作会导致当前线程阻塞,当前操作会立即返回,并重复执行第 2、 3步,直到客户端的请求读取完毕。)
4) 解析客户端请求数据
5) 读取文件
6) Handler重新绑定 write事件
7) 当 connection可以开始 write的时候, Reactor响应 write事件,分发给绑定了 write事件的handler
8) Handler 执行 task ,向客户端发送文件(此处是非阻塞写,即如果当前操作会导致当前线程阻塞,当前操作会立即返回,并重复执行第 7 、 8 步,直到文件全部发送完毕。)
注意:上述两个过程都是在服务器的一个线程里完成的,该线程响应所有客户端的请求。譬如服务端在处理客户端 A 的请求时,如果在第 2 步 read 事件还没有就绪(或者在第 3 步读取数据的时候发生阻塞了),则该线程会立即重新回到客户端连接服务器过程的第 2 步(即事件循环处理),等待新的事件通知。如果此时客户端 B 请求连接,则该线程会响应 B 的连接请求,这样就实现了一个线程同时为多个连接服务的效果。
3、 代码示例
3.1 NIO的几个关键概念
Selector:
Reactor里的一个核心组成部分,通过调用 selector.select()方法,可以知道感兴趣的 IO事件里哪些已经 ready,该方法是阻塞的,直到有 IO事件 ready;通过调用 selector.selectedKeys()方法,可以获取到 selectionKey对象,这些对象关联有已经 ready的 IO事件。
SelectionKey:
当 selector注册一个 channel时,会产生一个该对象,譬如SelectionKey selectionKey = channel .register(selector, SelectionKey. OP_ACCEPT );它维护着 channel 、 selector 、 IO 事件、Handler 之间的关系。通过调用 attach 方法,可以绑定一个 handler ,譬如:selectionKey.attach(new Acceptor());
ServerSocketChannel:
类似于 ServerSocket,唯一的区别在于: ServerSocketChannel可以使用 selector,而且可以设置为非阻塞模式。
SocketChannel:
类似于 Socket,唯一的区别在于: SocketChannel可以使用 selector,而且可以设置为非阻塞模式。
ByteBuffer :数据缓冲器,是 NIO 里将数据移入移出 channel 的唯一方式
3.2 code
注:所有代码只用来作为原理的进一步阐述,不能用于生产环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
服务端代码如下(单线程版本)
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
/**
* @author jason
*
*/
public class NioServer implements Runnable {
private InetAddress hostAddress;
private int port;
private ServerSocketChannel serverChannel;
private Selector selector;
public NioServer(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
// 初始化selector,绑定服务端监听套接字、感兴趣事件及对应的handler
this.selector = initSelector();
}
public static void main(String[] args) {
try {
// 启动服务器
new Thread(new NioServer(null, 9090)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
// 选择事件已经ready的selectionKey,该方法是阻塞的,只有当至少存在selectionKey,或者wakeup方法被调用,或者当前线程被中断,才会返回
selector.select();
// 循环处理每一个事件
Iterator<SelectionKey> items = selector.selectedKeys()
.iterator();
while (items.hasNext()) {
SelectionKey key = (SelectionKey) items.next();
items.remove();
if (!key.isValid()) {
continue;
}
// 事件处理分发
dispatch(key);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 事件处理分发
*
* @param sk
* 已经ready的selectionKey
*/
private void dispatch(SelectionKey sk) {
// 获取绑定的handler
Runnable r = (Runnable) sk.attachment();
if (r != null) {
r.run();
}
}
/**
* 初始化selector,绑定服务端监听套接字、感兴趣事件及对应的handler
*
* @return
* @throws IOException
*/
private Selector initSelector() throws IOException {
// 创建一个selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// 创建并打开ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverChannel.configureBlocking(false);
// 绑定端口
serverChannel.socket().bind(new InetSocketAddress(hostAddress, port));
// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听客户端连接事件
SelectionKey selectionKey = serverChannel.register(socketSelector,
SelectionKey.OP_ACCEPT);
// 绑定handler
selectionKey.attach(new Acceptor());
return socketSelector;
}
/**
* 处理OP_ACCEPT事件的handler
*
*/
class Acceptor implements Runnable {
@Override
public void run() {
try {
accept();
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept() throws IOException {
System.out.println("connect");
// 建立连接
SocketChannel socketChannel = serverChannel.accept();
System.out.println("connected");
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 创建Handler,专门处理该连接后续发生的OP_READ和OP_WRITE事件
new Handler(selector, socketChannel);
}
}
}
handler代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* @author jason
*
*/
final class Handler implements Runnable {
final SocketChannel socketChannel;
final SelectionKey key;
static final int MAXIN = 8192, MAXOUT = 11240 * 1024;
ByteBuffer readBuffer = ByteBuffer.allocate(MAXIN);
ByteBuffer outBuffer = ByteBuffer.allocate(MAXOUT);
static final int READING = 0;
static final int SENDING = 1;
int state = READING;
Handler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听该连接上得read事件
this.key = socketChannel.register(selector, SelectionKey.OP_READ);
// 绑定handler
this.key.attach(this);
}
/**
* 处理write
*
* @throws IOException
*/
private void write() throws IOException {
socketChannel.write(outBuffer);
if (outBuffer.remaining() > 0) {
return;
}
state = READING;
key.interestOps(SelectionKey.OP_READ);
}
/**
* 处理read
*
* @throws IOException
*/
private void read() throws IOException {
readBuffer.clear();
int numRead;
try {
// 读取数据
numRead = socketChannel.read(readBuffer);
} catch (Exception e) {
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
socketChannel.close();
key.cancel();
return;
}
// 处理数据
process(numRead);
}
/**
* 处理数据
*
* @param numRead
*/
private void process(int numRead) {
byte[] dataCopy = new byte[numRead];
System.arraycopy(readBuffer.array(), 0, dataCopy, 0, numRead);
System.out.println(new String(dataCopy));
outBuffer = ByteBuffer.wrap(dataCopy);
state = SENDING;
// 设置Key的interest set为监听该连接上的write事件
key.interestOps(SelectionKey.OP_WRITE);
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
write();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package sampleNio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
/**
* @author jason
*
*/
public class NioClient implements Runnable {
private InetAddress hostAddress;
private int port;
private Selector selector;
private ByteBuffer readBuffer = ByteBuffer.allocate(8192);
private ByteBuffer outBuffer = ByteBuffer.wrap("nice to meet you"
.getBytes());
public NioClient(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
initSelector();
}
public static void main(String[] args) {
try {
NioClient client = new NioClient(
InetAddress.getByName("localhost"), 9090);
new Thread(client).start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
selector.select();
Iterator<?> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isConnectable()) {
finishConnection(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void initSelector() throws IOException {
// 创建一个selector
selector = SelectorProvider.provider().openSelector();
// 打开SocketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 连接指定IP和端口的地址
socketChannel
.connect(new InetSocketAddress(this.hostAddress, this.port));
// 用selector注册套接字,并返回对应的SelectionKey,同时设置Key的interest set为监听服务端已建立连接的事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
private void finishConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
try {
// 判断连接是否建立成功,不成功会抛异常
socketChannel.finishConnect();
} catch (IOException e) {
key.cancel();
return;
}
// 设置Key的interest set为OP_WRITE事件
key.interestOps(SelectionKey.OP_WRITE);
}
/**
* 处理read
*
* @param key
* @throws IOException
*/
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (Exception e) {
key.cancel();
socketChannel.close();
return;
}
if (numRead == 1) {
System.out.println("close connection");
socketChannel.close();
key.cancel();
return;
}
// 处理响应
handleResponse(socketChannel, readBuffer.array(), numRead);
}
/**
* 处理响应
*
* @param socketChannel
* @param data
* @param numRead
* @throws IOException
*/
private void handleResponse(SocketChannel socketChannel, byte[] data,
int numRead) throws IOException {
byte[] rspData = new byte[numRead];
System.arraycopy(data, 0, rspData, 0, numRead);
System.out.println(new String(rspData));
socketChannel.close();
socketChannel.keyFor(selector).cancel();
}
/**
* 处理write
*
* @param key
* @throws IOException
*/
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(outBuffer);
if (outBuffer.remaining() > 0) {
return;
}
// 设置Key的interest set为OP_READ事件
key.interestOps(SelectionKey.OP_READ);
}
}
4、 Reactor 的其他实现方式
单线程版本的 Reactor 最大的优势是:不需要做并发控制,简化了实现。缺点是不能充分利用多核 CPU的优势,因为只有一个线程,该线程需要执行所有的操作: accept、 read、decode、 compute、 encode、 send,而其中的 decode、 compute、 encode如果很耗时,则该线程就不能及时的响应其他客户端的请求。
为了解决该问题,可以采用另外两种版本:
4.1 Worker threads:
Reactor所在的线程只需要专心的响应客户端的请求: accept、 read、 send。对数据的具体处理过程则交给另外的线程池。这样可以提高服务端对客户端的响应速度,但同时增加了复杂度,也没有充分利用到多核的优势,因为 reactor只有一个,譬如同一时刻只能 read一个客户端的请求数据。
4.2Multiple reactor threads :
采用多个 reactor ,每个 reactor 都在自己单独的线程里执行。如果是多核,则可以同时响应多个客户端的请求。( Netty 采用的是类似这种方式,boss线程池就是多个mainReactor,worker线程池就是多个subReactor)
5、总结
本文分析了基于 NIO和 Reactor模式的网络服务器设计方案,在后续的 blog中将结合 Netty进一步分析高性能网络服务器的设计。
发表评论
-
thrift-client/asynclient
2013-10-20 14:02 712thrift-client/asynclient -
thift-tprocessor
2013-10-20 13:48 742thift 服务端模型之TThreadedSelector ... -
thift-tropocol层
2013-10-20 13:47 591asdf -
thift-transport层
2013-10-20 13:46 2221TTransport=>TIOStreamTrans ... -
thift 服务端模型之TThreadedSelectorServer分析-server层
2013-10-20 12:50 9273线程模型: 参数: public static c ... -
maven学习
2013-10-13 21:40 6711.使用archetype生成简 ... -
java 日志发展
2013-10-13 21:32 688日志使用http://www.iteye.com/topic ... -
NIO 终结者
2013-09-30 16:03 949传统IO请求等待主要在 ... -
文本和二进制
2013-09-02 16:39 1373摘自Serv-U网站: When usin ... -
新浪微博授权过程
2013-08-18 22:00 68971.你的网站首先要去新浪去认证审核,这样对应新浪微博开发平台来 ... -
jdk代理,cgib代理和spring后处理bean的关系
2013-08-18 14:11 2031最近项目中使用InstantiationAwareBean ... -
基于事件的 NIO 多线程服务器 附件为高性能网络通讯
2013-07-28 14:53 1164基于事件的 NIO 多线程服务器 ... -
压力测试
2013-07-28 14:02 651压力测试条件 1.并发 ... -
dbcp&commons-pool
2013-07-25 15:31 890DBCP主要是为jdbc提供连 ... -
Java版的各种Thrift server实现的比较
2013-07-14 08:53 4411Thrift Java Servers ComparedTh ... -
IO - 同步,异步,阻塞,非阻塞 转
2013-07-11 20:45 600当你发现自己最受欢迎 ... -
java 进阶技术方向的好书
2013-06-11 11:51 6821.java分布式应用基础和实践 http ... -
application/x-www-form-urlencoded和multipart/form-data
2013-05-31 22:06 1085关于application/x-www-form- ... -
HashMap&ConcurrentHashMap 深度分析(转)
2013-05-31 20:45 981java.util.HashMap是很常见的类,前段时间公 ... -
java中文乱码终结者(转)
2013-05-31 19:04 5128转自:http://blog.csdn.net ...
相关推荐
源码分析可以帮助开发者理解其设计模式,例如Reactor模式,以及如何通过ByteBuf管理网络数据,如何处理I/O事件,以及ChannelHandler的职责等。 第二部分"NIO+Netty5各种RPC架构实战演练",则侧重于将NIO与Netty结合...
3. **Reactor模式**:在NIO中,Reactor模式是一种处理多个客户端连接的并发设计模式。它通过一个线程来监听和接受新的连接,然后分派事件到相应的处理器。相比于传统的多线程模型,Reactor模式可以更有效地管理大量...
设计模式是软件开发中的一种通用解决方案,它们是解决常见问题的标准策略,而非代码的直接复用。"Observer(观察者)"和"Reactor(反应器)"是两种广泛应用于并发和事件驱动编程的设计模式。 Observer模式的核心...
NIO的核心特性在于其非阻塞模式,能够提高服务器的并发性能,尤其适合于长连接和高并发的网络应用,如聊天服务器、游戏服务器等。xSocket是一个基于Java NIO的网络通信框架,它旨在简化开发人员在构建高性能、高可用...
2. **Reactor模式的实现**:如何设计和实现一个高效的Reactor,以及其在Java NIO框架中的应用。 3. **并发与多线程**:在处理I/O事件时如何利用线程池和同步机制来优化性能。 4. **性能优化策略**:如缓冲区大小的...
39_NIO中Scattering与Gathering深度解析 40_Selector源码深入分析 41_NIO网络访问模式分析 42_NIO网络编程实例剖析 43_NIO网络编程深度解析 44_NIO网络客户端编写详解 45_深入探索Java字符集编解码 46_字符集编解码...
41_NIO网络访问模式分析 42_NIO网络编程实例剖析 43_NIO网络编程深度解析 44_NIO网络客户端编写详解 45_深入探索Java字符集编解码 46_字符集编解码全方位解析 47_Netty服务器与客户端编码模式回顾及源码分析准备 48_...
3. **Reactor模式**:Reactor是Java NIO的基础,它是一种基于事件驱动的设计模式,用于高效地处理多个事件源。Reactor模式的核心思想是定义了线程和事件处理器之间的协作机制,其中线程在事件发生时调用相应的事件...
第41讲:NIO网络访问模式分析 第42讲:NIO网络编程实例剖析 第43讲:NIO网络编程深度解析 第44讲:NIO网络客户端编写详解 第45讲:深入探索Java字符集编解码 第46讲:字符集编解码全方位解析 第47讲:Netty...
总结来说,Netty 的高效接收网络连接能力主要得益于其主从 Reactor 模型的设计,以及 Channel 和 ChannelPipeline 的灵活架构。通过将连接的建立与数据处理分离,Netty 能够有效地利用系统资源,处理大量的并发连接...
Java NIO引入了选择器(Selector)的概念,它基于Reactor设计模式,类似于一个观察者。选择器允许程序员注册感兴趣的事件(如接受连接、读取数据、写入数据等)到多个通道(如SocketChannel)。当某个通道有注册的...
总之,Netty通过高效的NIO模型、丰富的编解码器、灵活的线程管理和强大的异常处理能力,为企业级应用提供了稳定、高性能的网络通信解决方案。学习Netty,不仅能提升网络编程技能,也能为开发高效分布式系统打下坚实...
Java游戏服务器框架是一种用于开发高性能、高可伸缩性、分布式多进程的游戏后端系统的技术解决方案。这样的框架设计目标是确保游戏服务器能够处理大量并发玩家,同时提供稳定、低延迟的服务,以支持实时性强的在线...
Netty的架构基于Reactor模式,它使用了高效的工作线程模型来处理网络事件,并且具有高吞吐量、低延迟的特点,非常适合作为游戏服务器、实时通信系统、互联网应用后端等高性能网络应用的基础。 Netty还提供了很多...
Netty是JBoss提供的一个高效的Java NIO(New I/O)开发框架,旨在为开发者提供一个能够快速、简单、可扩展的网络编程解决方案。Netty基于Reactor模型构建,该模型是一种广泛用于异步事件驱动的应用程序的网络编程...
Netty 是一个高度优化、基于Java NIO的网络应用程序框架,专为开发高效的网络服务器和客户端而设计。面试中,Netty 相关的问题通常会涵盖NIO的基础知识,Netty的特点以及其在实际应用中的优势。以下是这些知识点的...
5. **事件驱动编程**:非阻塞网络服务器通常采用事件驱动架构,如Reactor或Proactor模式,通过事件循环监听多个套接字的事件,一旦有事件发生,就进行相应的处理。 6. **网络协议支持**:nionet作为网络库,可能...
2. **多线程编程需求**:由于 NIO 采用 Reactor 模式,因此需要开发者具备一定的多线程编程经验,这对新手来说是个不小的挑战。 3. **可靠性问题**:面对诸如断连重连、网络闪断、半包读写等常见问题时,开发者需要...