`
wuyuhou
  • 浏览: 13908 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

简单的SEDA框架实现

阅读更多
/**
 * 段容器接口定义
 *
 * @author wuyuhou
 *
 */
public interface IStagedContainer {
		
	//取得标识
	String getId();
	
	//发送事件
	void sendEvent(IEvent e);

	// 启动
	void start();
	
	// 停止
	void stop();
}

/**
 * 事件接口
 *
 * @author yourname (mailto:yourname@primeton.com)
 */
public interface IEvent {
	
	String getId();
	
	<T> T getData();
	
	void setData(Object data);
	
	Throwable getException();
	
	 void setException(Throwable exception);
}

/**
 * 事件处理接口
 *
 * @author yourname (mailto:yourname@primeton.com)
 */
public interface IEventHandler {
	
	/**
	 * 事件处理,原则上不允许抛出异常
	 * 
	 * @param event
	 */
	void handleEvent(IEvent event);
}

/**
 * 事件处理回调接口
 *
 * @author wuyuhou
 *
 */
public interface IEventCallback {
	/**
	 * 回掉处理
	 * 
	 * @param event
	 */
	void callback(IEvent event);
}

/**
 * 事件路由器
 *
 * @author wuyuhou
 *
 */
public interface IEventRouter {
	
	/**
	 * 路由处理
	 * 
	 * @param event
	 */
	void route(IEvent event);
}

/**
 * 段容器实现
 *
 * @author yourname (mailto:yourname@primeton.com)
 */
public class StagedContainer implements IStagedContainer {
	
	private static final ILogger log = DebugLoggerFactory.getLogger(StagedContainer.class);
	
	//唯一标识
	private String id = null;
	
	//执行器(线程池管理)
	private Executor executor = null;
	
	//事件队列(可以持久化实现)
	private IQueue<IEvent> queue = null;
	
	//事件处理者
	private IEventHandler eventHandler = null;
	
	//事件路由处理
	private IEventRouter eventRouter = null;
	
	//事件处理主线程
	private EventHandelMainThread eventHandelMainThread = null;	
	
	// 空闲间隔时间,默认一秒
	private int idleTime = 1000;

	private boolean isStarted = false;
	
	/**
	 * 
	 * 构造方法
	 *
	 */
	public StagedContainer(String id) {
		if (id == null || id.trim().length() == 0) {
			throw new IllegalArgumentException("StagedContainerId is null!");
		}
		this.id = id;
	}
	
	public String getId() {
		return id;
	} 

	protected Executor getExecutor() {		
		return executor;
	}

	protected IQueue<IEvent> getQueue() {
		return queue;
	}
	
	protected void setExecutor(Executor executor) {
		if (executor == null) {
			throw new IllegalArgumentException("executor is null!");
		}
		this.executor = executor;
	}

	protected void setQueue(IQueue<IEvent> queue) {
		if (queue == null) {
			throw new IllegalArgumentException("queue is null!");
		}
		this.queue = queue;
	}

	public IEventHandler getEventHandler() {
		return eventHandler;
	}

	public void setEventHandler(IEventHandler eventHandler) {
		if (eventHandler == null) {
			throw new IllegalArgumentException("eventHandler is null!");
		}
		this.eventHandler = eventHandler;
	}
	
	public IEventRouter getEventRouter() {
		return eventRouter;
	}

	public void setEventRouter(IEventRouter eventRouter) {
		if (eventRouter == null) {
			throw new IllegalArgumentException("eventRouter is null!");
		}
		this.eventRouter = eventRouter;
	}

	public int getIdleTime() {
		return idleTime;
	}

	public void setIdleTime(int idleTime) {
		if (idleTime <= 0) {
			throw new IllegalArgumentException("IdleTime is not less than zero!");
		}
		this.idleTime = idleTime;
	}

	// 发送事件
	public void sendEvent(IEvent e) {
		if (!isStarted) {
			throw new IllegalStateException("StagedContianer has not yet started!");
		}
		if (e == null) {
			throw new IllegalArgumentException("event is null!");
		}
		getQueue().offer(e);
	}
	
	public void start() {
		if (queue == null) {
			//默认没有持久化
			queue = new PersistenceQueue<IEvent>(3000, new DirPersistence<IEvent>("d:/test/queue"));
		}
		
		queue.start();
		
		if (executor == null) {
			//默认是10个线程的定长线程池
			executor = Executors.newFixedThreadPool(10, new ThreadFactoryWithName("StagedContainer:" + getId()));
		}
		
		eventHandelMainThread = new EventHandelMainThread(getId(), getQueue(), getExecutor(), 
				new IEventHandler() {
					public void handleEvent(IEvent event) {
						String eventId = event.getId();
						//取消事件处理
						if (CancelEventCache.containCancelEvent(eventId)) {
							CancelEventCache.removeCancelEvent(eventId);
							log.warn("Event[{0}] is cancel!", new Object[]{eventId});
							return;
						}
						
						try {
							//事件处理
							IEventHandler eventHandler = getEventHandler();
							if (eventHandler != null) {
								eventHandler.handleEvent(event);
							}
						} finally {
							//路由处理
							IEventRouter eventRouter = getEventRouter();
							if (eventRouter != null) {
								eventRouter.route(event);
							}
						}						
					}
			
		}, idleTime);
		eventHandelMainThread.start();
		isStarted = true;     
	}
	
	public void stop() {
		if (executor != null) {
			//停止线程执行
			if (executor instanceof ExecutorService) {
				ExecutorService es = (ExecutorService) executor;
				try {
					es.shutdownNow();
				} catch (Exception e) {
					try {
						es.shutdown();
					}catch (Exception ignore) {
						
					}
				}
			}
		}
		eventHandelMainThread.shutdownThread();
		
		if (queue != null) {
			queue.stop();
		}
		
		executor = null;	
		eventHandelMainThread = null;
		queue = null;
		isStarted = false; 
	}
	
	// 事件处理主线程
	static class EventHandelMainThread extends Thread {
		
		private boolean isShutdown = false;
		
		private IQueue<IEvent> eventQueue = null;
		private Executor executor = null;
		private IEventHandler eventHandler = null;
		private int idleTime;
		
		public EventHandelMainThread(String name, IQueue<IEvent> eventQueue, Executor executor, IEventHandler eventHandler, int idleTime) {
			super(name);
			this.eventQueue = eventQueue;
			this.executor = executor;
			this.eventHandler = eventHandler;
			this.idleTime = idleTime;
		}
		
		@Override
		public void run() {
			while (!isShutdown) {				
				final IEvent event = eventQueue.poll();
				//如果队列里没有事件
				if (event == null) {
					try {
						Thread.sleep(idleTime);
					} catch (InterruptedException e) {
					}
					continue;
				}
				executor.execute(new Runnable(){
					public void run() {
						eventHandler.handleEvent(event);
					}						
				});
			}
		}
		
		//关闭主线程
		public void shutdownThread() {
			isShutdown = true;
			this.interrupt();
		}		
	}
}

/**
 * 取消的事件缓存
 *
 * @author yourname (mailto:yourname@primeton.com)
 */
public class CancelEventCache {
	private static Object OBJECT = new Object();
	private static ConcurrentHashMap<String, Object> eventMap = new ConcurrentHashMap<String, Object>();	
	
	public static void addCancelEvent(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return;
		}
		eventMap.put(eventId, OBJECT);
	}
	
	public static void removeCancelEvent(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return;
		}
		eventMap.remove(eventId);
	}
	
	public static boolean containCancelEvent(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return false;
		}
		return eventMap.containsKey(eventId);
	}
}

/**
 * Callback管理
 *
 * @author yourname (mailto:yourname@primeton.com)
 */
public class EventCallbackManager {
	private static ConcurrentHashMap<String, IEventCallback> conMap = new ConcurrentHashMap<String, IEventCallback>();	
	
	public static IEventCallback getEventCallback(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return null;
		}
		IEventCallback callback = conMap.get(eventId);
		conMap.remove(eventId);
		return callback;
	}
	
	public static void register(String eventId, IEventCallback callback) {
		if (eventId == null || eventId.trim().length() == 0 || callback == null) {
			return;
		}
		conMap.put(eventId, callback);
	}
	
	public static void clear() {
		conMap.clear();
	}
}

/**
 * 可以指定名称的线程工厂
 *
 * @author yourname (mailto:yourname@primeton.com)
 */
public class ThreadFactoryWithName implements ThreadFactory {
	
	static final AtomicInteger poolNumber = new AtomicInteger(1);

	final ThreadGroup group;

	final AtomicInteger threadNumber = new AtomicInteger(1);

	final String namePrefix;
	
	final boolean isDaemon;

	public ThreadFactoryWithName(String name) {
		this(name, false);
	}
	
	public ThreadFactoryWithName(String name, boolean isDaemon) {
		SecurityManager s = System.getSecurityManager();
		group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
		namePrefix = name == null ? "Seda-Default" : name + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
		this.isDaemon = isDaemon;
	}

	public Thread newThread(Runnable r) {
		Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());		
		t.setDaemon(isDaemon);

		if (t.getPriority() != Thread.NORM_PRIORITY) {
			t.setPriority(Thread.NORM_PRIORITY);
		}
		return t;
	}
}
分享到:
评论
3 楼 wuyuhou 2012-04-12  
yuanyu5237 写道
楼主您好,请教一下,代码中的IQueue在哪儿?

在上一篇博客中
2 楼 yuanyu5237 2012-01-17  
找到了,呵呵,多谢楼主,这恐怕是我在网上找到的唯一一个比较简单可学习的seda框架代码
1 楼 yuanyu5237 2012-01-17  
楼主您好,请教一下,代码中的IQueue在哪儿?

相关推荐

    基于SEDA的异步框架设计与实现

    分析SEDA框架的源码可以帮助我们深入理解其实现细节,包括事件的调度机制、阶段间的通信方式以及并发控制策略的具体实现。 8. **工具支持**: 在实际开发中,可能会用到一些工具来辅助构建和调试基于SEDA的应用,...

    SEDA的企业服务总线的设计与实现

    SEDA模型的核心思想是将复杂的并发操作分解为一系列简单的、可串行化的阶段,每个阶段负责处理特定的任务,并通过事件触发机制来推进整个处理流程。 #### 三、SEDA模型应用于ESB的设计 ##### 3.1 需求分析 在高...

    基于SEDA的企业服务总线的设计与实现

    ### 基于SEDA的企业服务总线的设计与实现 #### 概述 本文主要探讨了如何通过采用阶段事件驱动架构(SEDA)来优化企业服务总线(ESB)的性能,特别是在高并发请求场景下。随着面向服务架构(SOA)在企业级软件开发...

    seda-release

    **SEDA(Staged Event Driven Architecture)框架详解** SEDA,全称为Staged Event Driven Architecture,是一种在高性能计算...对于需要处理大规模并发请求的现代互联网服务来说,理解和应用SEDA框架是非常有价值的。

    如何利用SEDA提高系统的性能和稳定性 开发可掌控高并发服经验分享:非阻塞+异步化+队列 共19页.pptx

    而Twitter的Finagle框架进一步发展了这一概念,通过提供灵活的、可扩展的服务架构,实现了更高效的并发处理。 SEDA的核心思想是将服务分解为多个独立的阶段,并在这些阶段之间使用队列作为缓冲。这样做的好处包括:...

    MuleEsb开源框架简介

    6. **高度可伸缩性**:基于SEDA(Staged Event-Driven Architecture)模型,实现高性能和高可扩展性。 7. **基于EIP的路由机制**:采用企业集成模式(EIP),实现复杂的消息路由和处理。 #### 二、Mule ESB的整体...

    aggregate-framework:Aggregate Framework是为方便开发人员运用DDD和CQRS思想来构建复杂的、可扩展的Java企业应用系统而提供的Java技术框架。该框架提供了Aggregate、Repository、Domain Event等构建块的实现;使用DomainEvent,借助于内建的Disruptor组件,AggregateFramework可使开发人员方便的实现高性能SEDA架构。此外,该框架支持与Spring集成,提供使用 annotation的方式让开发人员方便地

    该框架提供了最核心的构建块的实现,比如Aggregate、Repository和Event。此外,该框架支持与Spring集成,提供使用annotation的方式让开发人员方便地注册事件及定义事件处理,使用Spring事务管理器管理事务时,支持...

    Wake:Wake是一个事件驱动的框架,基于SEDA,Click,Akka和Rx的思想

    唤醒Wake是一个事件驱动的框架,基于SEDA,Click,Akka和Rx的思想。 从某种意义上说,它是通用的,旨在支持计算密集型应用程序以及高性能网络,存储和旧版I / O系统。 我们实现了Wake以支持高性能,可扩展的分析处理...

    一种基于Netty框架的网络应用服务器设计方法

    该框架利用Java NIO来实现Reactor模式,这种模式通过同步等待多个I/O事件的发生,然后通过多路复用机制将这些事件分发给相应的处理线程,从而实现了高效的I/O操作。 - **Reactor模型**:Netty采用的Reactor模型能够...

    MuleEsb开源框架简介.pdf

    Mule ESB的处理模型基于SEDA(Staged Event-Driven Architecture),这使得它具有高度的可伸缩性,适合处理大规模的并发事件。同时,Mule ESB拥有强大的基于Enterprise Integration Patterns(EIP)的路由机制,能够...

    (源码)基于C++的MiniOB数据库系统.zip

    4. 基础功能实现包括INI文件配置解析、IO操作、互斥锁管理、字符串处理、日志系统、MD5加密、随机数生成、正则表达式匹配、指标报告、内存池管理、路径处理、PID文件管理、进程管理、信号处理、SEDA框架、定时器管理...

    ESB解决方案-mule分享.docx

    目前市场上有多种 ESB 产品,包括 Oracle 的商业 ESB 产品、Progress 的商业 ESB 产品、TIBCO 的商业 ESB 产品、Mule 的开源 ESB 框架、WSO2 的开源 ESB 框架等。这些产品都提供了基本的 ESB 功能,包括消息传递、...

    ESB学习总结

    1. 它是面向服务的框架实现,与操作系统和编程语言无关。 2. 使用XML作为通信标准,支持Web服务标准。 3. 支持多种消息传递模式,如同步、异步、点对点和发布-订阅。 4. 包含适配器来集成传统系统。 5. 提供服务编制...

    mule esb 的简单介绍

    7. **基于SEDA(Staged Event-Driven Architecture)的处理模型**:这使得Mule能够处理高并发和大规模的事务,确保系统的可伸缩性。 8. **基于EIP(Enterprise Integration Patterns)的事件路由**:Mule ESB利用...

    ESB学习总结宣贯.pdf

    Mule是一个开源的ESB框架,它采用SEDA(分级事件驱动架构)模式,将请求处理过程划分为多个阶段,实现资源的有效分配和异步通信。Mule ESB提供了一个服务注册在总线上的模式,服务之间相互独立,只关注自身处理的...

    Spring Boot

    1. **外围方案缺失**:Spring Boot本身并不包含服务注册与发现、负载均衡等功能,这些通常需要借助其他工具或框架(如Eureka、Zuul等)来实现。 2. **安全策略不完善**:对于安全性要求较高的场景,可能需要额外的...

    openfire综合介绍

    - **线程模型**:MINA提供了灵活的线程模型,可以根据应用场景的需求选择单线程模式、线程池模式或多个线程池模式(SEDA)。 - **SSL支持**:MINA内置了SSL/TLS/StartTLS支持,简化了安全通信的实现过程。 - **管理...

    MULE IN ACTION

    Mule的企业集成模式(EIP)库允许开发者通过简单的组件组合来实现复杂的企业集成。 在部署方面,Mule支持热部署和热重载,这意味着可以不中断服务而更新配置和部署新代码。日志记录和异常处理也是Mule框架的重要组成...

    openfire二次开发资料整理

    - **线程模型**:支持单线程、线程池及基于阶段驱动的多线程池(SEDA),可根据实际应用场景选择最适合的模式。 - **安全支持**:内置SSL/TLS/StartTLS支持,简化安全通信的实现过程。 - **测试友好**:提供模拟对象...

Global site tag (gtag.js) - Google Analytics