`
Mojarra
  • 浏览: 130785 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

Reactor Pattern (二)

阅读更多

 

 

JDK1.4 后, Sun 积极推广 New IO ,其中 non-blocking 是新的 socket 编程模式,大幅度提高了服务器端 socket 并发处理能力, Selector, SocketChannel SelectionKey 这三个类配合使用,可以构成一个比较经典的 Reactor 模式。其中 Selector 类只筛选 socket 通讯中感兴趣的事件,如接受连接,读取数据,写数据等。 SocketChannel 是真正的 socket 通讯的实现者,负责读传到服务器端 socket 数据和往客户端 socket 写数据。 SelectionKey 是对 socket 通讯事件的封装。在实行一个 non-blocking IO reactor 模式过程中,我仍然按照 reactor 模式的定义,设计多路复用分发器,分配器,任务处理程序,并模拟并发请求客户端。

 

 

多路复用分发器

DemultiPlexer 继承了 Thread 类,在 signal 方法中,调用 selector.select() 函数,该函数等待一个 SocketChannel 中的事件出现才返回。在 signal 方法中,对两种事件感兴趣,一个是 OP_ACCEPT, 一个是 OP_READ ,当收到 OP_ACCEPT 事件时,把 SelectionKey 中的 channel 中转化成 ServerSocketChannel ,并且掉中 ServerSocketChannel.accpet() 方法,返回一个 SocketChannel ,然后在设置成 non-blocking IO 模式,最后注册 OP_READ 事件。当收到一个 OP_READ 事件时,想 Dispatcher 注册这个事件,让 Dispatcher 去启动一个线程去处理该事件,处理完成后,并把该事件从 Selector 中撤销。

 

class DemultiPlexer extends Thread {
	private Selector selector;
	private Dispatcher dispatcher;

	@Override
	public void run() {
		for (;;) {
			try {
				signal();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private void signal() throws IOException, InterruptedException {
		int c = selector.select();
		if (c <= 0) {
			return;
		}
		Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
		while (keys.hasNext()) {
			SelectionKey key = keys.next();
			keys.remove();
			if (key.isValid()) {
				if (key.isAcceptable()) {
					accept(key);
				} else if (key.isReadable()) {
					dispatcher.register(key);
				}
			}
		}
	}

	private void accept(SelectionKey key) throws IOException {
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		SocketChannel channel = ssc.accept();
		channel.configureBlocking(false);
		channel.register(selector, SelectionKey.OP_READ);
//		key.cancel();
	}

	public DemultiPlexer(Selector selector, Dispatcher dispatcher) {
		this.selector = selector;
		this.dispatcher = dispatcher;
	}
}
 

 

分配器

分配器的主要职责是负责监视 SelectionKey 队列之中已经被选取的 OP_READ 事件,如果队列中无任何事件,分配器线程处于中断状态,一旦有新的事件被注册到分配器中,分配器线程会被唤醒,去处理这个事件。为了说明 Reactor 模式,本例中,设计了只要有事件被注册,分配器就新建一个请求处理线程去读取数据这种简单的资源调度机制。在实际应用中,线程、数据库连接,内存等等都是资源,在有限的情况下,分配这些资源都会比这个模型复杂的多。

 

class Dispatcher extends Thread {
	 
	private static LinkedBlockingQueue<SelectionKey> queue = new LinkedBlockingQueue<SelectionKey>();

	public void run() {
		for (;;) {
			try {
				dispatch();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				
                            e.printStackTrace();                            
			}
		}
	}

	public void dispatch() throws IOException, InterruptedException {
		synchronized (queue) {
			if (queue.size() <= 0) {
				queue.wait();
			} else {
                                //拿出对列中的第一个事件,创建一个新的任务处理程序
                                SelectionKey key = queue.poll();  
				new RequestHandler(key).start();
			}
		}
	}
      /**
       * 注册新的事件,并唤醒当前线程。
       */  
      public void register(SelectionKey key) throws IOException {
		synchronized (queue) {
			queue.add(key);
			queue.notify();
		}		 
	}

	 
}

 

 

请求处理程序

请求处理程序是真正的业务处理代码。本例中主要负责读取客户端 socket 写入的数据,然后把当前的 SelectionKey 取消,让下一次 select 操作忽略掉当前的 SelectionKey. 这个类中使用了 Charset 辅助类来完成把读取的字节反编成一个可读的字符串。

 

class RequestHandler extends Thread {
	private SelectionKey selectionKey;
	private static Charset utf8 = Charset.forName("utf-8");

	@Override
	public void run() {
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		ByteBuffer buff = ByteBuffer.allocate(128);
		try {  
                        // make sure the socketchannel is connected. 
			if (!socketChannel.isConnected())
				socketChannel.finishConnect();
                        

                        // read the data and print it after decode.
			while (socketChannel.read(buff) > 0) {
				buff.flip();
				System.out.println(utf8.decode(buff));
				buff.clear();
			}
			
			//remove the key for next selection operation
			selectionKey.cancel();			 
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public RequestHandler(SelectionKey selectionKey) {
		this.selectionKey = selectionKey;
	}
}

 

 

请求客户端

一个请求客户端连上服务器端后,往服务器端发送 ”Hello non-blocking IO reactor Pattern” 字符串。在 non blocking IO socket 编程中,客户端的 SocketChannel 也使用 Selector ,并注册感兴趣的事件。本例中注册 OP_CONNECT 事件,然后确保 SocketChannel 是连接上服务器端的,再向服务器端发送一个字符串后,请求客户端的任务完成。 main 函数中,模拟了 50 个并发的客户端。

 

class Request extends Thread {
	private int port = 9000;

	@Override
	public void run() {
		Charset utf8 = Charset.forName("utf-8");
		try {
			SocketChannel channel = SocketChannel.open(new InetSocketAddress(port));
			channel.configureBlocking(false);			 
			Selector selector = Selector.open();
			channel.register(selector, SelectionKey.OP_CONNECT);
			if (!channel.isConnected())
				channel.finishConnect();
			channel.write(utf8.encode("Hello non-blocking IO reactor pattern.."));
			channel.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String args[]) {
		for (int i = 0; i < 50; i++)
			new Request().start();
	}
}
 

最后启动一个服务器端,来运行这个non-blocking IO的服务器端。在main函数中,首先open一个selector和ServerSocketChannel,配置为non-blocking IO模式,绑定服务端口9000并注册OP_ACCEPT。最后启动多路复用分发器线程和分配器线程。

 

public class ReactorPattern {
	public static void main(String args[]) throws IOException {
		Selector sel = Selector.open();
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(9000));
		ssc.register(sel, SelectionKey.OP_ACCEPT);
		Dispatcher disp = new Dispatcher();
		disp.start();
		new DemultiPlexer(sel, disp).start();
	}
}

 小结

在non-blocking IO模式中,DemultiPlexer类是担任了筛选事件和注册事件的工作,Selector.select()采用的是blocking方式,调用select方法,都必须等到其有返回值为至,但select方法本身是线程安全的,因此DemultiPlexer类在运行环境下可以存在多个线程,配合多个Dispacher线程一起工作。如

 

 // initialize selector and register OP_ACCEPT
......

Dispatcher disp1 = new Dispatcher();
disp1.start();
new DemultiPlexer(sel, disp1).start();


Dispatcher disp2 = new Dispatcher();
disp2.start();
new DemultiPlexer(sel, disp2).start();

 

预告

Proactor模式是Reactor模式的兄弟,两者有很多相似之处,Proactor模式有何特点,相对于Reactor模式,它又有什么优点呢,如果您感兴趣,请继续关注我的博客。

 

 

[原创内容,版权所有,如有转载,请注明出处,如果您发现文中有什么错误请指出,不胜感激]

0
0
分享到:
评论
1 楼 Mojarra 2011-11-10  
ps:
Non-blocking模式中,Selector注册一个事件后,对这个事件做了适当的处理之后,必须在下一次select操作之前必须调用SelectionKey.cancel()方法。这一个特性限制了Selector在Reactor模式中的应用,试想一下,启动一个任务处理线程后,多路事件分发器在事件轮循中并不知道这个线程处理此事件是处于何种状态。这个事件到底有没有被处理完成,多路事件分发器是不清楚的,无论是SelectionKey,还是SocketChannel,都没有一个合适的方法知道它所代表的事件已经被处理。如果检测到一个key后,立即调用这个key的cancel方法,会发生一些意象不到的效果。

一个比较笨拙的办法是在Dispatcher中维护一个正在处理的事件的HashMap,在一个新的事件被注册到分配器时,首先检查这个事件的hash是否已存在于当前的hashmap中,如果存在则不启动新的任务处理线程,否则启动。事件处理完成后,从hashmap中删除此事件的hash。

还有一种方法是OP_ACCEPT事件从selector轮循中剥离出去,Selector只放在socketChannel中使用,ServerSocketChannel仍使用类似面向线程的socket编程方法,在ServerSocketChannel收到一个接入的连接时,启动一个线程,让socketChannel在线程中注册感兴趣的事件以及移除该事件。这样会导致一个任务处理线程开一个Selector实例,系统的开销增加。

这两种方法中,前者的开销小,编程复杂度低,整体上优于第二个方法。

相关推荐

    Reactor Pattern (一)

    **Reactor模式(一)** Reactor模式是一种事件驱动的设计模式,它主要用于处理并发I/O操作,通过将I/O事件的处理与事件处理程序解耦,实现高效的异步处理。在高并发环境下,Reactor模式可以显著提升系统性能,因为...

    Reactor+指南中文版_2.01

    Reactor提供了一种避免这种阻塞的方式,通过使用发布-订阅模式(Publish-Subscribe pattern)或响应式编程模型,使得多个订阅者可以并发地处理事件,而无需锁定。 Reactor的几个关键组件包括: - **Flux**:用于处理...

    reactor-siemens.pdf

    "Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events." In _Pattern Languages of Program Design_, edited by Jim Coplien and Douglas C. Schmidt, ISBN...

    reactor-siemens

    Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients

    Network Pattern

    常见的网络模式有Reactor和Proactor两种,它们在并发处理和异步操作中扮演着关键角色。 **Reactor模式** Reactor模式是一种事件驱动的设计模式,广泛应用于多线程和网络编程。它的核心思想是通过一个中心调度器...

    reactor-io-pattern

    这就是io设计模式---reactor Reactor 是事件驱动的,并使用 os api(select、poll、epoll、kqueue、iocp 等)来调度 socket 事件。 编程只需要向Reactor注册事件源socket发生的读、写或except以及具体的事件处理...

    Spring 5 Design Pattern

    Spring 5是Spring框架的第五个主要版本,提供了对响应式编程模型WebFlux的支持,这一特性是通过与Project Reactor的集成实现的。Spring 5设计模式这本图书,作者是Dinesh Rajput,涵盖了在高效开发Spring 5应用程序...

    Node.js Design Patterns Second Edition[July 2016]

    It covers the Node.js ecosystem and its philosophy, a short introduction to Node.js version 6, ES2015, and the reactor pattern. Chapter 2, Node.js Essential Patterns, introduces the first steps ...

    78程序员练级攻略(2018):异步IO模型和lock-Free编程1

    Reactor模式是处理大量并发连接的关键,它通过事件驱动的方式实现了非阻塞I/O,Understanding Reactor Pattern系列文章是深入理解这一模式的好资源。 Lock-Free编程是另一个关键领域,特别是在高并发和高性能系统中...

    基于Java NIO反应器模式设计与实现

    反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...

    非阻塞式网络服务器 nio.pdf recator

    ##### Reactor Pattern 反应器模式 - **基本版本**:反应器模式的基本版本通常包含一个单一的线程负责监听和分发事件。 - **多线程版本**:为了进一步提高性能,可以采用多个线程的形式,其中主线程负责监听事件,...

    Scalable IO in Java -- Doug Lea

    本文档基于Doug Lea的作品,深入探讨了如何通过事件驱动处理以及反应器模式(Reactor pattern)来构建高效的IO处理系统。 首先,我们需要了解什么是可扩展网络服务。可扩展网络服务是指能够根据服务负载的变化,...

    DougLeaNio.pdf

    内容涉及事件驱动处理(Event-driven processing)和反应器模式(Reactor pattern),以及多线程版本和其他变种。 2. **非阻塞IO API** - Java NIO的非阻塞操作 - 介绍了非阻塞IO的API,重点在于非阻塞模式下如何...

    高性能网络通讯

    - **Reactor Pattern**:为了解决上述问题,阿里巴巴采用了Reactor模式。在这种模式下,通过一个或多个线程监听所有连接上的I/O事件,并将这些事件分发给相应的处理程序,从而显著提高了系统的吞吐量和响应速度。 #...

    A Tutorial Introduction to the ADAPTIVE Communication Environment (ACE)

    #### 二、IPC SAP(Interprocess Communication Software Abstraction Package) **IPC SAP** 是ACE框架中的一个关键组成部分,主要用于进程间通信。它通过一系列的类来支持不同的通信方式,比如流式通信、数据报...

    Java Nio实现React堆线程模型-netty首要知识

    了解了React堆线程模型后,我们来看看`reactor-pattern-master`这个压缩包可能包含的内容。根据名称猜测,这可能是一个关于Reactor模式的示例项目或者教程资料。其中可能包括: - 代码示例:展示如何在Java中实现...

    Scalable IO in Java.pdf

    文档接着介绍了“反应器模式”(Reactor pattern),这是一种广泛用于实现事件驱动模型的架构模式。它包括了基本版本以及多线程版本和其他变体。反应器模式的核心思想是有一个等待事件发生的调度器(通常称为“反应...

    Scalable IO in Java

    3. 反应器模式(Reactor pattern):这是一种用于构建可扩展IO服务的设计模式,它包括基本版本和多线程版本,以及其它变体。反应器模式中,一个或多个输入源被监视,当输入源准备好进行IO操作时,事件会被分发到相应...

    Packt.Reactive.Programming.in.Kotlin

    Chapter 1, A Short Introduction to Reactive Programming, helps you understand the context, thinking pattern, and principles of reactive programming. Chapter 2, Functional Programming with Kotlin and ...

Global site tag (gtag.js) - Google Analytics