`
Mojarra
  • 浏览: 130792 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

Reactor Pattern (一)

阅读更多

Reactor Pattern 是一种为处理服务请求并发提交到一个或者多个服务处理程序的事件设计模式,当请求抵达后,服务处理程序使用多路分配策略,然后同步地派发这些请求至相关的请求处理程序。

 

结构

从结构上说, Reactor 设计模式具有 4 个要素。

资源 ,系统可以提供输出或者被输入

同步事件多路分配器 ,适用事件循环方式阻止所有的资源,当可以在一个无锁资源上启动一个同步操作时,同步事件多路分配器发送这个资源到适配器。

适配器 ,处理请求处理程序注册和注销,从同步事件多路分配器派发资源给相关的请求处理程序。

请求处理程序 ,应用定义的实际处理请求的程序和相关的资源。

优势

从定义上讲,所有的 reactor 系统是单线程模式,但是它也可以应用到多线程环境之中。

这种设计模式完全分割了应用业务代码和 reactor 模式的实现代码,这意味着业务组件可以模块化划分,可重用。同时,因为同步调用请求处理程序,当没有向系统中增加复杂多线程时, reactor 设计模式允许简化粗粒度并发。

限制

相对于别的过程化设计模式,因为逆向控制, Reactor 模式非常难与调试。除此外,只通过同步的方式调用请求处理程序, reactor 模式限制了最大并发量,特别是在 SMP 硬件上。 Reactor 的可伸缩性不但受到同步调用请求处理程序的限制,还受到多路分配器的限制。早些版本的 Unix select poll 调用都有一个最大的描述符,当这个描述符设置过大的时候会带来性能的问题。最近,更具有伸缩性的这类系统已经被设计出来,如 Solaris /dev/poll, Linux epoll, 基于 BSD 系统的 kqueue/kevent ,这些系统实现了高描述符情况下的高性能。

Reactor 模式与 Java 网络多线程

Reactor模式的设计目的是为了解决服务器环境下多线程的问题,直JDK1.4后,Java积极推广NIO和Selector模式的Server和Comsumer网络编程模式,但是Selector模式的编程没有传统的Stream模式显得优雅,这里,先用传统的Stream网络IO编程方式来实现一个reactor模式。

依据前面的描述,照猫画虎,设计Dispatcher,Demishplexer, RequestHander分别对应派发器,多路分发器和请求处理程序,至于资源,在这个例子中就抽象成一个数组,数组中的内容代表处理request时需要用到的资源,在实际的应用中,资源可能是一个数据连接,RPC连接等。在这个模式中,资源一旦被初始化,任何代码不可以改变数组中的内容,也即不改变resource。多路分发器根据当前数组中的元素决定是否分发“资源”给Dispatcher。另外还有一个requester来模拟客户端请求任务。

 

为了简单起见,本例中直接在分发器中初始化资源。分发器有两个重要的方法,accept方法接受request并且判断是否有资源给dispatcher,returnResource方法是在任务处理程序执行完后归还资源给分发器。

 

class DemultiPlexer {
   // initialize resources   
 private ArrayBlockingQueue<Integer> resources = new ArrayBlockingQueue<Integer>(2);
	{
		resources.add(1);
		resources.add(2);
	}

	private ArrayList<Integer> requests = new ArrayList<Integer>(5);

	/**
	 * 
	 * @param s
	 *            the starting time of get resource action
	 * @param timeout
	 *            , limit the request resource action within a time period in
	 *            second
	 * @return a integer represent a resource requesting
	 */
	public synchronized Integer getResource(long s, int timeout) {
		for (;;) {
			if ((System.currentTimeMillis() - s) / 1000 > timeout) {
				throw new RuntimeException("time out to get resource");
			} else if (resources.size() > 0) {
				return resources.poll();
			}
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public void returnResource(Integer i) {
		synchronized (resources) {
			resources.add(i);
		}
	}

	/**
	 * accept a request and create a new dispatcher to assign the resource to a
	 * request handler.
	 * 
	 * @param Request
	 */
	public void accept(Integer requestId) {
		requests.add(requestId);
		Integer rid = getResource(System.currentTimeMillis(), 5);
		Dispatcher d = new Dispatcher(this, requestId, rid);
		d.createRequestHandler().start();
	}
}
 

Dispatcher拿到资源后,负责创建新的任务处理程序,这里是直接创建,在实际应用中,任务处理程序可能会被放到线程池中去执行。

 

class Dispatcher {
	private Integer requestId;
	private Integer resourceId;
	private DemultiPlexer demultiPlexer;

	public Dispatcher(DemultiPlexer demultiPlexer, Integer requestId, Integer resourceId) {
		super();
		this.requestId = requestId;
		this.resourceId = resourceId;
		this.demultiPlexer = demultiPlexer;
	}

	/**
	 * create a request handler when demultiPlexer has an ideal resource.
	 * 
	 * @return RequestHandler
	 */
	public RequestHandler createRequestHandler() {
		return new RequestHandler(this, requestId, resourceId);
	}

	/**
	 * when request handler complete task, free the resource assigned.
	 */
	public synchronized void freeResource(Integer i) {
		demultiPlexer.returnResource(i);
	}
       
// getters & setters ......

}
 

RequestHandler才是真正的任务执行者。执行完后,需要归还资源,以便其他的任务执行者能请求道资源。

 

class RequestHandler extends Thread {
	private Integer tid;
	private Integer rid;
	private Dispatcher dispatcher;

	public RequestHandler(Dispatcher dispatcher, Integer tid, Integer rid) {
		this.tid = tid;
		this.rid = rid;
		this.dispatcher = dispatcher;
	}

	@Override
	public void run() {
		System.out.println("the request No.[" + tid + "] is handling the requesting with resource [" + rid + "]");
		try {
			sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("the request No.[" + tid + "] processed the requesting with resource [" + rid + "]");

		// free resource after request was handled
		dispatcher.freeResource(rid);

		System.out.println("return resource: " + rid);
	}
}

 

Requester类是请求程序,该类的run方法在连上socket后发生一个自身的标识符requestid, 在main函数中启动10个requester线程,来模拟多用户并发访问的情形。

 

class Requester extends Thread {

	private String address = "localhost";

	private int port = 1220;

	private int requestID;

	@Override
	public void run() {
		try {
			Socket socket = new Socket(address, port);
			socket.getOutputStream().write(requestID);
			sleep(5000);
			socket.close();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("No." + requestID + " was sent...");
	}

	public static void main(String args[]) {
		for (int i=0; i< 10 ; i++){
			new Requester(i).start();
		}
	}

}

 

 有了这几个类后,再启动server端来处理这些请求就可以构成一个完整的reactor模式的代码。这个类启动serversocket读取requester发送过来的requestid,交给分发器去处理,serversocket闲置30s后自动关闭,另外这个类还注册了一个shutdownhook,在程序异常终止时,确保serversocket被关闭。

 

public class ReactorPattern {

	DemultiPlexer demultiPlexer = new DemultiPlexer();

	public static void main(String args[]) throws InterruptedException, IOException {
		ReactorPattern reactorPattern = new ReactorPattern();
		ServerSocket server = new ServerSocket(1220);
		Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(server));

		long s = System.currentTimeMillis();
		for (;;) {
			if (System.currentTimeMillis() - s > 30000)
				break;
			Socket socket = server.accept();
			int i = socket.getInputStream().read();
			if (i != -1) {
				reactorPattern.getDemultiPlexer().accept(i);
			}			 
		}
		server.close();
	}
	public DemultiPlexer getDemultiPlexer() {
		return demultiPlexer;
	}
	
}
 

辅助类ShutdownHookThread,

class ShutdownHookThread extends Thread {
	private ServerSocket server;

	public ShutdownHookThread(ServerSocket server) {
		this.server = server;
	}

	@Override
	public void run() {
		try {
			server.close();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			server = null;
		}
	}
}

 

先执行ReactorPattern类,再执行Requester类。输出如下:

 

the request No.[7] is handling the requesting with resource [1]
the request No.[8] is handling the requesting with resource [2]
the request No.[7] processed the requesting with resource [1]
return resource: 1
the request No.[8] processed the requesting with resource [2]
return resource: 2
the request No.[5] is handling the requesting with resource [1]
the request No.[1] is handling the requesting with resource [2]
the request No.[5] processed the requesting with resource [1]
the request No.[1] processed the requesting with resource [2]
return resource: 1
return resource: 2
the request No.[9] is handling the requesting with resource [1]
the request No.[4] is handling the requesting with resource [2]
the request No.[4] processed the requesting with resource [2]
the request No.[9] processed the requesting with resource [1]
return resource: 1
return resource: 2
the request No.[0] is handling the requesting with resource [1]
the request No.[3] is handling the requesting with resource [2]
the request No.[0] processed the requesting with resource [1]
the request No.[3] processed the requesting with resource [2]
return resource: 1
return resource: 2
the request No.[2] is handling the requesting with resource [1]
the request No.[6] is handling the requesting with resource [2]
the request No.[2] processed the requesting with resource [1]
return resource: 1
the request No.[6] processed the requesting with resource [2]
return resource: 2
 

 

小结

 

Reactor模式在分发资源和归还资源的时候是同步的,在处理请求的时候,是异步的,实际使用中,真正的业务代码会被放到请求处理程序之中。

 

附件中有完整的代码共大家学习研究。

 

[原创内容,版权所有,如果转载,请注明出处]

分享到:
评论

相关推荐

    Reactor+指南中文版_2.01

    Reactor提供了一种避免这种阻塞的方式,通过使用发布-订阅模式(Publish-Subscribe pattern)或响应式编程模型,使得多个订阅者可以并发地处理事件,而无需锁定。 Reactor的几个关键组件包括: - **Flux**:用于处理...

    reactor-siemens.pdf

    "Reactor: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events." In _Pattern Languages of Program Design_, edited by Jim Coplien and Douglas C. Schmidt, ISBN...

    reactor-siemens

    Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients

    Network Pattern

    Reactor模式是一种事件驱动的设计模式,广泛应用于多线程和网络编程。它的核心思想是通过一个中心调度器(Reactor)来分发接收到的事件到相应的处理器。当网络事件(如套接字的读写就绪)发生时,Reactor会调用预先...

    reactor-io-pattern

    这就是io设计模式---reactor Reactor 是事件驱动的,并使用 os api(select、poll、epoll、kqueue、iocp 等)来调度 socket 事件... time_server.cpp 和 time_client.cpp 只是一个例子。 其他文件是实现React器文件。

    Spring 5 Design Pattern

    Spring 5是Spring框架的第五个主要版本,提供了对响应式编程模型WebFlux的支持,这一特性是通过与Project Reactor的集成实现的。Spring 5设计模式这本图书,作者是Dinesh Rajput,涵盖了在高效开发Spring 5应用程序...

    基于Java NIO反应器模式设计与实现

    反应器设计模式(Reactor Pattern)是事件驱动架构中的一种反应式编程模式,通常用于高并发的场景中。它主要用于处理多个事件的异步分发。在反应器模式中,有一个或多个输入源(例如,连接、数据流)和相应的事件...

    Scalable IO in Java -- Doug Lea

    本文档基于Doug Lea的作品,深入探讨了如何通过事件驱动处理以及反应器模式(Reactor pattern)来构建高效的IO处理系统。 首先,我们需要了解什么是可扩展网络服务。可扩展网络服务是指能够根据服务负载的变化,...

    A Tutorial Introduction to the ADAPTIVE Communication Environment (ACE)

    **Reactor** 是ACE中的核心设计模式之一,用于处理事件驱动的网络应用。 - **Reactor 组件 (REACTOR COMPONENTS)**:包括事件处理器、事件分发器等组件。 - **事件处理器 (Event Handlers)**:负责监听和处理网络...

    非阻塞式网络服务器 nio.pdf recator

    ##### Reactor Pattern 反应器模式 - **基本版本**:反应器模式的基本版本通常包含一个单一的线程负责监听和分发事件。 - **多线程版本**:为了进一步提高性能,可以采用多个线程的形式,其中主线程负责监听事件,...

    Scalable IO in Java.pdf

    文档接着介绍了“反应器模式”(Reactor pattern),这是一种广泛用于实现事件驱动模型的架构模式。它包括了基本版本以及多线程版本和其他变体。反应器模式的核心思想是有一个等待事件发生的调度器(通常称为“反应...

    Scalable IO in Java

    3. 反应器模式(Reactor pattern):这是一种用于构建可扩展IO服务的设计模式,它包括基本版本和多线程版本,以及其它变体。反应器模式中,一个或多个输入源被监视,当输入源准备好进行IO操作时,事件会被分发到相应...

    78程序员练级攻略(2018):异步IO模型和lock-Free编程1

    Reactor模式是处理大量并发连接的关键,它通过事件驱动的方式实现了非阻塞I/O,Understanding Reactor Pattern系列文章是深入理解这一模式的好资源。 Lock-Free编程是另一个关键领域,特别是在高并发和高性能系统中...

    高性能网络通讯

    - **Reactor Pattern**:为了解决上述问题,阿里巴巴采用了Reactor模式。在这种模式下,通过一个或多个线程监听所有连接上的I/O事件,并将这些事件分发给相应的处理程序,从而显著提高了系统的吞吐量和响应速度。 #...

    DougLeaNio.pdf

    内容涉及事件驱动处理(Event-driven processing)和反应器模式(Reactor pattern),以及多线程版本和其他变种。 2. **非阻塞IO API** - Java NIO的非阻塞操作 - 介绍了非阻塞IO的API,重点在于非阻塞模式下如何...

    Java Nio实现React堆线程模型-netty首要知识

    了解了React堆线程模型后,我们来看看`reactor-pattern-master`这个压缩包可能包含的内容。根据名称猜测,这可能是一个关于Reactor模式的示例项目或者教程资料。其中可能包括: - 代码示例:展示如何在Java中实现...

    ACE-tutorial

    - **Active Objects Pattern**: 一种设计模式,用于简化并发程序的设计。 - **定义**: Active Object模式允许对象异步执行操作,从而避免了同步调用带来的阻塞问题。 - **实现**: 通过使用`ACE_Task`类来实现...

    Node.js Design Patterns Second Edition[July 2016]

    It covers the Node.js ecosystem and its philosophy, a short introduction to Node.js version 6, ES2015, and the reactor pattern. Chapter 2, Node.js Essential Patterns, introduces the first steps ...

Global site tag (gtag.js) - Google Analytics