- 浏览: 142087 次
- 性别:
- 来自: 北京
文章分类
最新评论
阅读原文请点击:http://click.aliyun.com/m/22420/
摘要: 目录 线程体系结构 反应堆模式 组件架构 接收器 分配器 分配器级别事件处理器 应用程序级别事件处理器 总结 参考资料 如果你被要求去写一个高可扩展性的基于JAVA的服务器,你很快就会决定使用JAVA NIO包。
目录
线程体系结构
反应堆模式
组件架构
接收器
分配器
分配器级别事件处理器
应用程序级别事件处理器
总结
参考资料
如果你被要求去写一个高可扩展性的基于JAVA的服务器,你很快就会决定使用JAVA NIO包。为了让服务器跑起来,你可能会花很多时间阅读博客和教程来了解线程同步需要NIO SELECTOR类以及处理一些常见的陷阱。本文描述了一个面向连接基于NIO的服务器的基本架构。本文会先看一下一个首选的线程模型然后讨论服务器的一些基本组件。
Threading Architecture线程体系结构
第一种也是最直观的方式去实现一个多线程的服务器是每个连接一个线程的方式。这是JAVA1.4以前的解决方案,由于老版本的JAVA缺少非阻塞的I/O支持。每个连接一个线程的方法分配一个独家的工作线程给每个连接。在处理循环中,工作线程等待新进入的数据,处理这个请求,返回响应数据,然后再调用阻塞socket的read方法。
01
public class Server {
02
private ExecutorService executors = Executors.newFixedThreadPool(10);
03
private boolean isRunning = true;
04
05
public static void main(String... args) throws ... {
06
new Server().launch(Integer.parseInt(args[0]));
07
}
08
09
public void launch(int port) throws ... {
10
ServerSocket sso = new ServerSocket(port);
11
while (isRunning) {
12
Socket s = sso.accept();
13
executors.execute(new Worker(s));
14
}
15
}
16
17
private class Worker implements Runnable {
18
private LineNumberReader in = null;
19
...
20
21
Worker(Socket s) throws ... {
22
in = new LineNumberReader(new InputStreamReader(...));
23
out = ...
24
}
25
26
public void run() {
27
while (isRunning) {
28
try {
29
// blocking read of a request (line)
30
String request = in.readLine();
31
32
// processing the request
33
...
34
String response = ...
35
36
// return the response
37
out.write(resonse);
38
out.flush();
39
} catch (Exception e ) {
40
...
41
}
42
}
43
in.close();
44
...
45
}
46
}
47
}
在同时发生的客户端连接和多个同步工作线程之间通常有一个单对单的关系。因为每个连接都有一个相关联的服务端等待线程,因此可以有很好的响应时间。然而,高负载需要更多的同步运行的线程,这些限制了可扩展性。尤其是,长时间存活的连接像持久化的HTTP连接导致大量的同步工作线程存在,有浪费时间等待新的客户端请求的趋势。此外,成百上千的同步线程会浪费大量的栈空间。注意,举例来说,Solaris/Sparc默认的JAVA栈空间是512KB.
如果server不得不处理大量同时发生的客户端,并且能容忍慢,无反应的客户端,就需要一种供替代的线程架构。每个事件一个线程的方式通过一种非常高效地方式实现了这样的需求。工作线程和连接独立,仅被用来处理特定的事件。举例来说,如果一个数据接收事件发生了,一个工作线程将会用来处理特定于应用程序的编码和服务任务(或至少启动这些任务)。任务一结束,工作线程就会回到线程池中。这种方式需要无阻塞的处理socket的I/O。调用socket的read或write方法需要时无阻塞的。此外,一个事件系统是必须的;它会发信号表明是否有新数据,轮流发起socket的read方法。这种方式移除了等待线程和工作线程之间的一对一关系。这样一个事件驱动的I/0系统的设计将会在反应堆模式中描述。
The Reactor Pattern反应堆模式
反应堆模式,如图1所示,把事件的检测例如准备就绪读或者准备就绪接受数据和事件的处理分离。如果一个准备就绪的事件发生了,专用工作线程内的一个事件处理器就会被通知去执行适当的处理。
Figure1
Figure 1. A NIO-based Reactor pattern implementation
连接通道需要先在Selector类中注册才能参与事件的架构。这可以通过调用regisster()方法来实现。虽然这个方法是SocketChannel的一部分,这个通道将会在Selector中注册,没有其它的方法。
1
...
2
SocketChannel channel = serverChannel.accept();
3
channel.configureBlocking(false);
4
5
// register the connection
6
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
7
...
为了检测新的事件,Selector类提供了请求已注册的通道就绪事件的能力。通过调用select方法 ,Selector收集已注册通道的就绪事件。这个方法的调用会阻塞,直到至少一个事件已经发生。在这种情况下,方法返回了自上次调用之后就绪的I/O操作的连接数。所选的连接可以通过调用Selector的selectedkey方法来检测。这个方法返回一个Selectionkey对象集合,里面存放了IO事件的状态和连接通道的引用。
一个Selector存在于Dispatcher中。这是一个单线程的活动类围饶着Selector类。Dispatcher类的职责是检测事件然后分发消费事件的处理给EventHandler类。在这个分发循环中,Dispatcher类调用Selector类的select方法等待新的事件。如果至少一个事件发生了,这个方法就返回,每个事件相关的通道可以通过调用selectedkeys方法获得。
01
...
02
while (isRunning) {
03
// blocking call, to wait for new readiness events
04
int eventCount = selector.select();
05
06
// get the events
07
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
08
while (it.hasNext()) {
09
SelectionKey key = it.next();
10
it.remove();
11
12
// readable event?
13
if (key.isValid() && key.isReadable()) {
14
eventHandler.onReadableEvent(key.channel());
15
}
16
17
// writable event?
18
if (key.isValid() && key.isWritable()) {
19
key.interestOps(SelectionKey.OP_READ); // reset to read only
20
eventHandler.onWriteableEvent(key.channel());
21
}
22
...
23
}
24
...
25
}
基于一个事件,类似于就绪读或就绪写,EventHandler会被Dispatcher调用来处理这个事件。EventHandler解码请求数据,处理必须的服务活动,编码响应数据。由于工作线程没有被强制去浪费时间等待新的请求然后建立一个连接,这种方式的可扩展性和吞吐量理论上只限制于系统资源像CPU和内存。这既便是说,响应时间将没有每个连接一个线程的方式快,由于参与线程间的切换和同步。事件驱动方法的挑战因此是最少化同步和优化线程管理,以致于这些影响可以被忽略。
Component Architecture组件架构
大多数具有高可扩展性的JAVA服务器都是建立在反应堆模式上的。这样做,反应堆模式中的类将会被增强,因为需要额外的类来连接管理,缓冲区管理,以及负载均衡。这个服用器的入口类是一个Acceptor。这个安排如图2所示。
Figure2
Figure 2. Major components of a connection-oriented server
Acceptor接收器
一个服务器每个新的客户端连接将会被单个Acceptor所接收,Acceptor与服务器的端口绑定。接收器是一个单线程的活动类。由于Acceptor仅负责处理历时非常短的客户端连接请求,经常只要用阻塞I/0模式实现Acceptor就足够了。Acceptor通过调用Serversocketchannel的阻塞accept方法来处理新请求。新请求将会注册到Dispatcher,这之后,请求就可以参与到事件处理中了。
由于一个Dispatcher的可扩展性非常有限,通常都会使用一个小的Dispatchers的池。这个限制当中的一个原因是特定的操作系统实现的Selector。大多数的操作系统一对一的映射SocketChannel和文件处理。取决于具体的系统,每个Selector的最大文件处理数的限制也是不同的。
01
class Acceptor implements Runnable {
02
...
03
void init() {
04
ServerSocketChannel serverChannel = ServerSocketChannel.open();
05
serverChannel.configureBlocking(true);
06
serverChannel.socket().bind(new InetSocketAddress(serverPort));
07
}
08
09
public void run() {
10
while (isRunning) {
11
try {
12
SocketChannel channel = serverChannel.accept();
13
14
Connection con = new Connection(channel, appHandler);
15
dispatcherPool.nextDispatcher().register(con);
16
} catch (...) {
17
...
18
}
19
}
20
}
21
}
在示例代码中,一个连接对象持有SocketChannel和应用级别的事件处理器。我们将会在下面描述这些类。
Dispatcher分配器
通过调用Dispatcher的register方法,SocketChannel将会注册到相关的Selector上。这里就是问题的来源。Selector在内部使用key集合来管理注册的通道。这意味着每次注册一个通道,一个相关连的SelectionKey会被创建并被加入到Selector的注册key集合。同时,并发的分发线程可以调用Selector的select方法,也会访问这个key集合。由于key集合是非线程安全的,一个非同步的Acceptor上下文注册会导致死锁和竞争。这个可以通过实现selector guard object idiom来解决,它允许暂时的挂起分配线程。参考”“http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf”> How to Build a Scalable Multiplexed Server with NIO” (PDF)来查看这个方法的解释。
01
class Dispatcher implements Runnable {
02
private Object guard = new Object();
03
…
04
05
void register(Connection con) {
06
// retrieve the guard lock and wake up the dispatcher thread
07
// to register the connection's channel
08
synchronized (guard) {
09
selector.wakeup();
10
con.getChannel().register(selector, SelectionKey.OP_READ, con);
11
}
12
13
// notify the application EventHandler about the new connection
14
…
15
}
16
17
void announceWriteNeed(Connection con) {
18
SelectionKey key = con.getChannel().keyFor(selector);
19
synchronized (guard) {
20
selector.wakeup();
21
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
22
}
23
}
24
25
public void run() {
26
while (isRunning) {
27
synchronized (guard) {
28
// suspend the dispatcher thead if guard is locked
29
}
30
int eventCount = selector.select();
31
32
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
33
while (it.hasNext()) {
34
SelectionKey key = it.next();
35
it.remove();
36
37
// read event?
38
if (key.isValid() && key.isReadable()) {
39
Connection con = (Connection) key.attachment();
40
disptacherEventHandler.onReadableEvent(con);
41
}
42
43
// write event?
44
…
45
}
46
}
47
}
48
}
在这个连接注册之后,Selector监听这个连接的就绪事件。如果一个事件发生了,通过传递相关的连接,这个Dispatcher的事件处理类的合适的回调方法将会被调用。
分配器级别事件处理器
处理一个就绪读事件的第一个行为是调用通道的读方法。与流接口相反,通道接口需要忽略读缓冲接口。通常会使用直接分配的ByteBuffer。直接缓冲区存在于本地内存,绕过JAVA堆内存。通过使用直接缓冲,socket的IO操作不再需要创建内部中间缓冲器。
通常情况下,读请求会被非常快的执行。Socket的读操作通常只是把一份接收到的数据从内核内存空间拷贝到读缓冲区,这个数据会存在于用户控制的内存空间。这些接收的数据将会被添加到连接的线程安全的读队列作进一步的处理。基于I/O操作的结果,特定于应用程序的任务会被执行。这些任务会被分配的应用级别的事件处理器处理。这类处理器通常被称为工作线程。
01
class DispatcherEventHandler {
02
...
03
04
void onReadableEvent(final Connection con) {
05
// get the received data
06
ByteBuffer readBuffer = allocateMemory();
07
con.getChannel().read(readBuffer);
08
ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);
09
10
// append it to read queue
11
con.getReadQueue().add(data);
12
...
13
14
// perform further operations (encode, process, decode)
15
// by a worker thread
16
if (con.getReadQueue().getSize() > 0) {
17
workerPool.execute(new Runnable() {
18
public void run() {
19
synchronized (con) {
20
con.getAppHandler().onData(con);
21
}
22
}
23
});
24
}
25
}
26
27
void onWriteableEvent(Connection con) {
28
ByteBuffer[] data = con.getWriteQueue().drain();
29
con.getChannel().write(data); // write the data
30
...
31
32
if (con.getWriteQueue().isEmpty()) {
33
if (con.isClosed()) {
34
dispatcher.deregister(con);
35
}
36
37
} else {
38
// there is remaining data to write
39
dispatcher.announceWriteNeed(con);
40
}
41
}
42
}
在特定于应用程序的任务中,数据会被编码,服务会被执行,数据会被写入。在写数据的时候,要被发送的数据会加入到写队列,然后调用Dispatcher类的announceWriteNeed方法。这个方法让Selector开始监听就绪读事件。如果这种事件发生,分配器级别的事件处理器就会执行onWriteableEvent方法。这从通道的写队列获取数据然后执行必要的写I/O操作。试图直接写数据,通过这种方法,将会导致死锁和竞争。
应用级别事件处理器
与分配器事件处理器相比,特定于应用的事件处理器监听高级别的面向连接的事件,例如建立连接,数据接收或者是关闭连接。具体的事件处理设计是NIO服务器框架像SEDA,MINA还有emberIO之间最大的不同。这些框架通常实现了多级的架构,这样事件处理链就可以使用。它允许增加像SSLHandler或DelayerWriteHandler之类可以拦截请求/响应处理的处理器。下面的例子展示了一个基于xSocket框架的应用级别的处理器。xScoket框架支持不同的处理器接口,这些接口里面定义了需要被实现的特定于应用的回调方法代码。
01
class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
02
private static final String DELIMITER = ...
03
private Mailbox mailbox = ...
04
05
public static void main(String... args) throws ... {
06
new MultithreadedServer(110, new POP3ProtocolHandler()).run();
07
}
08
09
public boolean onConnect(INonBlockingConnection con) throws ... {
10
if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
11
con.setWriteTransferRate(5); // reduce transfer: 5byte/sec
12
}
13
14
con.write("+OK My POP3-Server" + DELIMITER);
15
return true;
16
}
17
18
public boolean onData(INonBlockingConnection con) throws ... {
19
String request = con.readStringByDelimiter(DELIMITER);
20
21
if (request.startsWith("QUIT")) {
22
mailbox.close();
23
con.write("+OK POP3 server signing off" + DELIMITER);
24
con.close();
25
26
} else if (request.startsWith("USER")) {
27
this.user = request.substring(4, request.length());
28
con.write("+OK enter password" + DELIMITER);
29
30
} else if (request.startsWith("PASS")) {
31
String pwd = request.substring(4, request.length());
32
boolean isAuthenticated = authenticator.check(user, pwd);
33
if (isAuthenticated) {
34
mailbox = MailBox.openAndLock(user);
35
con.write("+OK mailbox locked and ready" + DELIMITER);
36
} else {
37
...
38
}
39
} else if (...) {
40
...
41
}
42
return true;
43
}
44
}
为了更简便的访问底层的读写队列,Connection对象提供了一些便利的面向流和通道的读写方法。
通过关闭连接,底层实现初始化一个可写事件往返的刷新写队列。连接会在遗留的数据被写完之后终止。除了这样一个控制终端,连接还能因为其它的原因关闭。例如,硬件故障可能导致基于TCP的连接中断。这样的情况只有在socket上执行读写操作或空闲超时的时候检测到。大多数的NIO框架提供一个内置的程序来处理这些不受控制的中断。
Conclusion总结
一个事件驱动的非阻塞架构是实现高效,高扩展性和高稳定性服务器的一个基本的层。其中的挑战就是最小化线程同步开销和优化连接和缓冲区的管理。这会是编程中最困难的部分。
但是没有必要重复发明轮子。一些框架像xSocket,emberIO,SEDA或MINA都抽象了低层次的事件处理和线程管理来简化创建高可扩展性的服务器。以上大部分的框架都支持SSL和UDP,本文中未提及这两点。
Resources参考资料
Scalable IO in Java” (PDF) describes event-driven processing by using Java NIO
“Tricks and Tips with NIO, Part 2: Why SelectionKey.attach() Is Evil” describes how a memory leak occurs by a unwary use of the
SelectionKey’s attach
“Pico Threads: Lightweight Threads in Java” shows the problems with
large-scale threaded programming and event-based techniques.
A Reactor pattern description by Douglas C. Schmidt (PDF)
Unix Network Programming: The Sockets Networking API gives a good overview about network programming in general, and gives a good impression what happens behind the Java I/O operations on the operating-system level.
xSocket is a LGPL NIO-based library to build network applications. Most example code of this article has been written based on xSocket.
作者:
Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.
阅读原文请点击:http://click.aliyun.com/m/22420/
摘要: 目录 线程体系结构 反应堆模式 组件架构 接收器 分配器 分配器级别事件处理器 应用程序级别事件处理器 总结 参考资料 如果你被要求去写一个高可扩展性的基于JAVA的服务器,你很快就会决定使用JAVA NIO包。
目录
线程体系结构
反应堆模式
组件架构
接收器
分配器
分配器级别事件处理器
应用程序级别事件处理器
总结
参考资料
如果你被要求去写一个高可扩展性的基于JAVA的服务器,你很快就会决定使用JAVA NIO包。为了让服务器跑起来,你可能会花很多时间阅读博客和教程来了解线程同步需要NIO SELECTOR类以及处理一些常见的陷阱。本文描述了一个面向连接基于NIO的服务器的基本架构。本文会先看一下一个首选的线程模型然后讨论服务器的一些基本组件。
Threading Architecture线程体系结构
第一种也是最直观的方式去实现一个多线程的服务器是每个连接一个线程的方式。这是JAVA1.4以前的解决方案,由于老版本的JAVA缺少非阻塞的I/O支持。每个连接一个线程的方法分配一个独家的工作线程给每个连接。在处理循环中,工作线程等待新进入的数据,处理这个请求,返回响应数据,然后再调用阻塞socket的read方法。
01
public class Server {
02
private ExecutorService executors = Executors.newFixedThreadPool(10);
03
private boolean isRunning = true;
04
05
public static void main(String... args) throws ... {
06
new Server().launch(Integer.parseInt(args[0]));
07
}
08
09
public void launch(int port) throws ... {
10
ServerSocket sso = new ServerSocket(port);
11
while (isRunning) {
12
Socket s = sso.accept();
13
executors.execute(new Worker(s));
14
}
15
}
16
17
private class Worker implements Runnable {
18
private LineNumberReader in = null;
19
...
20
21
Worker(Socket s) throws ... {
22
in = new LineNumberReader(new InputStreamReader(...));
23
out = ...
24
}
25
26
public void run() {
27
while (isRunning) {
28
try {
29
// blocking read of a request (line)
30
String request = in.readLine();
31
32
// processing the request
33
...
34
String response = ...
35
36
// return the response
37
out.write(resonse);
38
out.flush();
39
} catch (Exception e ) {
40
...
41
}
42
}
43
in.close();
44
...
45
}
46
}
47
}
在同时发生的客户端连接和多个同步工作线程之间通常有一个单对单的关系。因为每个连接都有一个相关联的服务端等待线程,因此可以有很好的响应时间。然而,高负载需要更多的同步运行的线程,这些限制了可扩展性。尤其是,长时间存活的连接像持久化的HTTP连接导致大量的同步工作线程存在,有浪费时间等待新的客户端请求的趋势。此外,成百上千的同步线程会浪费大量的栈空间。注意,举例来说,Solaris/Sparc默认的JAVA栈空间是512KB.
如果server不得不处理大量同时发生的客户端,并且能容忍慢,无反应的客户端,就需要一种供替代的线程架构。每个事件一个线程的方式通过一种非常高效地方式实现了这样的需求。工作线程和连接独立,仅被用来处理特定的事件。举例来说,如果一个数据接收事件发生了,一个工作线程将会用来处理特定于应用程序的编码和服务任务(或至少启动这些任务)。任务一结束,工作线程就会回到线程池中。这种方式需要无阻塞的处理socket的I/O。调用socket的read或write方法需要时无阻塞的。此外,一个事件系统是必须的;它会发信号表明是否有新数据,轮流发起socket的read方法。这种方式移除了等待线程和工作线程之间的一对一关系。这样一个事件驱动的I/0系统的设计将会在反应堆模式中描述。
The Reactor Pattern反应堆模式
反应堆模式,如图1所示,把事件的检测例如准备就绪读或者准备就绪接受数据和事件的处理分离。如果一个准备就绪的事件发生了,专用工作线程内的一个事件处理器就会被通知去执行适当的处理。
Figure1
Figure 1. A NIO-based Reactor pattern implementation
连接通道需要先在Selector类中注册才能参与事件的架构。这可以通过调用regisster()方法来实现。虽然这个方法是SocketChannel的一部分,这个通道将会在Selector中注册,没有其它的方法。
1
...
2
SocketChannel channel = serverChannel.accept();
3
channel.configureBlocking(false);
4
5
// register the connection
6
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
7
...
为了检测新的事件,Selector类提供了请求已注册的通道就绪事件的能力。通过调用select方法 ,Selector收集已注册通道的就绪事件。这个方法的调用会阻塞,直到至少一个事件已经发生。在这种情况下,方法返回了自上次调用之后就绪的I/O操作的连接数。所选的连接可以通过调用Selector的selectedkey方法来检测。这个方法返回一个Selectionkey对象集合,里面存放了IO事件的状态和连接通道的引用。
一个Selector存在于Dispatcher中。这是一个单线程的活动类围饶着Selector类。Dispatcher类的职责是检测事件然后分发消费事件的处理给EventHandler类。在这个分发循环中,Dispatcher类调用Selector类的select方法等待新的事件。如果至少一个事件发生了,这个方法就返回,每个事件相关的通道可以通过调用selectedkeys方法获得。
01
...
02
while (isRunning) {
03
// blocking call, to wait for new readiness events
04
int eventCount = selector.select();
05
06
// get the events
07
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
08
while (it.hasNext()) {
09
SelectionKey key = it.next();
10
it.remove();
11
12
// readable event?
13
if (key.isValid() && key.isReadable()) {
14
eventHandler.onReadableEvent(key.channel());
15
}
16
17
// writable event?
18
if (key.isValid() && key.isWritable()) {
19
key.interestOps(SelectionKey.OP_READ); // reset to read only
20
eventHandler.onWriteableEvent(key.channel());
21
}
22
...
23
}
24
...
25
}
基于一个事件,类似于就绪读或就绪写,EventHandler会被Dispatcher调用来处理这个事件。EventHandler解码请求数据,处理必须的服务活动,编码响应数据。由于工作线程没有被强制去浪费时间等待新的请求然后建立一个连接,这种方式的可扩展性和吞吐量理论上只限制于系统资源像CPU和内存。这既便是说,响应时间将没有每个连接一个线程的方式快,由于参与线程间的切换和同步。事件驱动方法的挑战因此是最少化同步和优化线程管理,以致于这些影响可以被忽略。
Component Architecture组件架构
大多数具有高可扩展性的JAVA服务器都是建立在反应堆模式上的。这样做,反应堆模式中的类将会被增强,因为需要额外的类来连接管理,缓冲区管理,以及负载均衡。这个服用器的入口类是一个Acceptor。这个安排如图2所示。
Figure2
Figure 2. Major components of a connection-oriented server
Acceptor接收器
一个服务器每个新的客户端连接将会被单个Acceptor所接收,Acceptor与服务器的端口绑定。接收器是一个单线程的活动类。由于Acceptor仅负责处理历时非常短的客户端连接请求,经常只要用阻塞I/0模式实现Acceptor就足够了。Acceptor通过调用Serversocketchannel的阻塞accept方法来处理新请求。新请求将会注册到Dispatcher,这之后,请求就可以参与到事件处理中了。
由于一个Dispatcher的可扩展性非常有限,通常都会使用一个小的Dispatchers的池。这个限制当中的一个原因是特定的操作系统实现的Selector。大多数的操作系统一对一的映射SocketChannel和文件处理。取决于具体的系统,每个Selector的最大文件处理数的限制也是不同的。
01
class Acceptor implements Runnable {
02
...
03
void init() {
04
ServerSocketChannel serverChannel = ServerSocketChannel.open();
05
serverChannel.configureBlocking(true);
06
serverChannel.socket().bind(new InetSocketAddress(serverPort));
07
}
08
09
public void run() {
10
while (isRunning) {
11
try {
12
SocketChannel channel = serverChannel.accept();
13
14
Connection con = new Connection(channel, appHandler);
15
dispatcherPool.nextDispatcher().register(con);
16
} catch (...) {
17
...
18
}
19
}
20
}
21
}
在示例代码中,一个连接对象持有SocketChannel和应用级别的事件处理器。我们将会在下面描述这些类。
Dispatcher分配器
通过调用Dispatcher的register方法,SocketChannel将会注册到相关的Selector上。这里就是问题的来源。Selector在内部使用key集合来管理注册的通道。这意味着每次注册一个通道,一个相关连的SelectionKey会被创建并被加入到Selector的注册key集合。同时,并发的分发线程可以调用Selector的select方法,也会访问这个key集合。由于key集合是非线程安全的,一个非同步的Acceptor上下文注册会导致死锁和竞争。这个可以通过实现selector guard object idiom来解决,它允许暂时的挂起分配线程。参考”“http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf”> How to Build a Scalable Multiplexed Server with NIO” (PDF)来查看这个方法的解释。
01
class Dispatcher implements Runnable {
02
private Object guard = new Object();
03
…
04
05
void register(Connection con) {
06
// retrieve the guard lock and wake up the dispatcher thread
07
// to register the connection's channel
08
synchronized (guard) {
09
selector.wakeup();
10
con.getChannel().register(selector, SelectionKey.OP_READ, con);
11
}
12
13
// notify the application EventHandler about the new connection
14
…
15
}
16
17
void announceWriteNeed(Connection con) {
18
SelectionKey key = con.getChannel().keyFor(selector);
19
synchronized (guard) {
20
selector.wakeup();
21
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
22
}
23
}
24
25
public void run() {
26
while (isRunning) {
27
synchronized (guard) {
28
// suspend the dispatcher thead if guard is locked
29
}
30
int eventCount = selector.select();
31
32
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
33
while (it.hasNext()) {
34
SelectionKey key = it.next();
35
it.remove();
36
37
// read event?
38
if (key.isValid() && key.isReadable()) {
39
Connection con = (Connection) key.attachment();
40
disptacherEventHandler.onReadableEvent(con);
41
}
42
43
// write event?
44
…
45
}
46
}
47
}
48
}
在这个连接注册之后,Selector监听这个连接的就绪事件。如果一个事件发生了,通过传递相关的连接,这个Dispatcher的事件处理类的合适的回调方法将会被调用。
分配器级别事件处理器
处理一个就绪读事件的第一个行为是调用通道的读方法。与流接口相反,通道接口需要忽略读缓冲接口。通常会使用直接分配的ByteBuffer。直接缓冲区存在于本地内存,绕过JAVA堆内存。通过使用直接缓冲,socket的IO操作不再需要创建内部中间缓冲器。
通常情况下,读请求会被非常快的执行。Socket的读操作通常只是把一份接收到的数据从内核内存空间拷贝到读缓冲区,这个数据会存在于用户控制的内存空间。这些接收的数据将会被添加到连接的线程安全的读队列作进一步的处理。基于I/O操作的结果,特定于应用程序的任务会被执行。这些任务会被分配的应用级别的事件处理器处理。这类处理器通常被称为工作线程。
01
class DispatcherEventHandler {
02
...
03
04
void onReadableEvent(final Connection con) {
05
// get the received data
06
ByteBuffer readBuffer = allocateMemory();
07
con.getChannel().read(readBuffer);
08
ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);
09
10
// append it to read queue
11
con.getReadQueue().add(data);
12
...
13
14
// perform further operations (encode, process, decode)
15
// by a worker thread
16
if (con.getReadQueue().getSize() > 0) {
17
workerPool.execute(new Runnable() {
18
public void run() {
19
synchronized (con) {
20
con.getAppHandler().onData(con);
21
}
22
}
23
});
24
}
25
}
26
27
void onWriteableEvent(Connection con) {
28
ByteBuffer[] data = con.getWriteQueue().drain();
29
con.getChannel().write(data); // write the data
30
...
31
32
if (con.getWriteQueue().isEmpty()) {
33
if (con.isClosed()) {
34
dispatcher.deregister(con);
35
}
36
37
} else {
38
// there is remaining data to write
39
dispatcher.announceWriteNeed(con);
40
}
41
}
42
}
在特定于应用程序的任务中,数据会被编码,服务会被执行,数据会被写入。在写数据的时候,要被发送的数据会加入到写队列,然后调用Dispatcher类的announceWriteNeed方法。这个方法让Selector开始监听就绪读事件。如果这种事件发生,分配器级别的事件处理器就会执行onWriteableEvent方法。这从通道的写队列获取数据然后执行必要的写I/O操作。试图直接写数据,通过这种方法,将会导致死锁和竞争。
应用级别事件处理器
与分配器事件处理器相比,特定于应用的事件处理器监听高级别的面向连接的事件,例如建立连接,数据接收或者是关闭连接。具体的事件处理设计是NIO服务器框架像SEDA,MINA还有emberIO之间最大的不同。这些框架通常实现了多级的架构,这样事件处理链就可以使用。它允许增加像SSLHandler或DelayerWriteHandler之类可以拦截请求/响应处理的处理器。下面的例子展示了一个基于xSocket框架的应用级别的处理器。xScoket框架支持不同的处理器接口,这些接口里面定义了需要被实现的特定于应用的回调方法代码。
01
class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
02
private static final String DELIMITER = ...
03
private Mailbox mailbox = ...
04
05
public static void main(String... args) throws ... {
06
new MultithreadedServer(110, new POP3ProtocolHandler()).run();
07
}
08
09
public boolean onConnect(INonBlockingConnection con) throws ... {
10
if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
11
con.setWriteTransferRate(5); // reduce transfer: 5byte/sec
12
}
13
14
con.write("+OK My POP3-Server" + DELIMITER);
15
return true;
16
}
17
18
public boolean onData(INonBlockingConnection con) throws ... {
19
String request = con.readStringByDelimiter(DELIMITER);
20
21
if (request.startsWith("QUIT")) {
22
mailbox.close();
23
con.write("+OK POP3 server signing off" + DELIMITER);
24
con.close();
25
26
} else if (request.startsWith("USER")) {
27
this.user = request.substring(4, request.length());
28
con.write("+OK enter password" + DELIMITER);
29
30
} else if (request.startsWith("PASS")) {
31
String pwd = request.substring(4, request.length());
32
boolean isAuthenticated = authenticator.check(user, pwd);
33
if (isAuthenticated) {
34
mailbox = MailBox.openAndLock(user);
35
con.write("+OK mailbox locked and ready" + DELIMITER);
36
} else {
37
...
38
}
39
} else if (...) {
40
...
41
}
42
return true;
43
}
44
}
为了更简便的访问底层的读写队列,Connection对象提供了一些便利的面向流和通道的读写方法。
通过关闭连接,底层实现初始化一个可写事件往返的刷新写队列。连接会在遗留的数据被写完之后终止。除了这样一个控制终端,连接还能因为其它的原因关闭。例如,硬件故障可能导致基于TCP的连接中断。这样的情况只有在socket上执行读写操作或空闲超时的时候检测到。大多数的NIO框架提供一个内置的程序来处理这些不受控制的中断。
Conclusion总结
一个事件驱动的非阻塞架构是实现高效,高扩展性和高稳定性服务器的一个基本的层。其中的挑战就是最小化线程同步开销和优化连接和缓冲区的管理。这会是编程中最困难的部分。
但是没有必要重复发明轮子。一些框架像xSocket,emberIO,SEDA或MINA都抽象了低层次的事件处理和线程管理来简化创建高可扩展性的服务器。以上大部分的框架都支持SSL和UDP,本文中未提及这两点。
Resources参考资料
Scalable IO in Java” (PDF) describes event-driven processing by using Java NIO
“Tricks and Tips with NIO, Part 2: Why SelectionKey.attach() Is Evil” describes how a memory leak occurs by a unwary use of the
SelectionKey’s attach
“Pico Threads: Lightweight Threads in Java” shows the problems with
large-scale threaded programming and event-based techniques.
A Reactor pattern description by Douglas C. Schmidt (PDF)
Unix Network Programming: The Sockets Networking API gives a good overview about network programming in general, and gives a good impression what happens behind the Java I/O operations on the operating-system level.
xSocket is a LGPL NIO-based library to build network applications. Most example code of this article has been written based on xSocket.
作者:
Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.
阅读原文请点击:http://click.aliyun.com/m/22420/
相关推荐
Netty 是一个基于 Java NIO 的高性能网络应用框架,它主要设计用于简化网络服务的开发,包括服务器和客户端。Netty 提供了高度抽象的 API,使得开发者能够更轻松地处理网络通信,同时保持高性能和高可扩展性。在本...
1. **高性能**:SOFA-RPC 使用高效的序列化协议,如protobuf或Hessian,减少网络传输的数据量,同时采用异步非阻塞IO模型,提升并发处理能力。 2. **高可扩展性**:该框架支持多种插件机制,例如服务注册与发现、...
Netty是一个基于JAVA NIO类库的异步通信框架,用于创建异步非阻塞、高性能、高可靠性和高可定制性的网络客户端和服务器端。Netty的主要特点包括异步、非阻塞、基于事件驱动的NIO框架,支持多种传输层通信协议,包括...
- **数据库访问**:通过异步方式(非阻塞、异步IO)和中间层DAL,提高数据库访问效率。 - **一致性**:使用复制协议(如两阶段提交、三阶段提交、Paxos协议)保证数据的一致性。 总之,构建高可用性和高扩展性的...
在现代Web服务架构中,Redis、Lua、Nginx和OpenResty是不可或缺的技术组件,它们共同构建了一个高性能、高可扩展的解决方案。让我们深入探讨这些技术的核心特点、整合方式以及性能优化策略。 首先,Redis是一款开源...
该发明的优点在于,通过Netty的异步非阻塞I/O特性,实现了高效的RPC调用,减少了网络通信的等待时间。同时,利用容器管理服务接口与实现的映射,使得服务发现和调用更加便捷,增强了系统的可扩展性和灵活性。此外,...
**MongoDB** 是一个基于分布式文件存储的数据库,常用于构建实时应用程序,因为它的灵活性和高可扩展性。在MERN栈中,MongoDB用于存储和管理数据。 **Express.js** 是一个轻量级的Node.js web应用框架,用于构建...
非阻塞IO (NIO) 提供了异步读取的能力,但写操作仍然是同步的;而异步IO (AIO) 则实现了真正的异步读写,大大提高了系统的并发能力。 **1.2 基于远程调用方式实现系统间通讯** 远程过程调用 (Remote Procedure ...
- **非阻塞IO**:使用事件驱动的非阻塞IO模型,提高了处理效率。 - **单线程模型**:避免了多线程间的上下文切换开销。 **1.3.5 提供API的语言** - **多种语言支持**:支持包括Java、Python、Node.js等多种编程语言...
"ekse-core:Epic Knife 服务器引擎核心"是一款基于Java技术构建的高性能、高可扩展性的服务器引擎。这个核心组件是Epic Knife项目的基础,它为开发者提供了构建复杂分布式系统的能力,尤其适用于需要处理大量并发...
数据层则被优化为一个多租户、横向扩展的存储服务,日志记录和存储分离,提高了性能。 ### 应用场景和客户案例 Aurora适用于需要高性能、高可用性和低延迟的在线交易处理(OLTP)应用,如电子商务、社交网络、游戏...
5. **异步处理**:CompFrame支持异步操作,通过非阻塞IO和线程池管理,提高了系统性能和并发处理能力。 三、适用场景 CompFrame适用于各种类型的应用程序开发,尤其是需要高可扩展性和维护性的大型项目,如企业级...