/**
* 段容器接口定义
*
* @author wuyuhou
*
*/
public interface IStagedContainer {
//取得标识
String getId();
//发送事件
void sendEvent(IEvent e);
// 启动
void start();
// 停止
void stop();
}
/**
* 事件接口
*
* @author yourname (mailto:yourname@primeton.com)
*/
public interface IEvent {
String getId();
<T> T getData();
void setData(Object data);
Throwable getException();
void setException(Throwable exception);
}
/**
* 事件处理接口
*
* @author yourname (mailto:yourname@primeton.com)
*/
public interface IEventHandler {
/**
* 事件处理,原则上不允许抛出异常
*
* @param event
*/
void handleEvent(IEvent event);
}
/**
* 事件处理回调接口
*
* @author wuyuhou
*
*/
public interface IEventCallback {
/**
* 回掉处理
*
* @param event
*/
void callback(IEvent event);
}
/**
* 事件路由器
*
* @author wuyuhou
*
*/
public interface IEventRouter {
/**
* 路由处理
*
* @param event
*/
void route(IEvent event);
}
/**
* 段容器实现
*
* @author yourname (mailto:yourname@primeton.com)
*/
public class StagedContainer implements IStagedContainer {
private static final ILogger log = DebugLoggerFactory.getLogger(StagedContainer.class);
//唯一标识
private String id = null;
//执行器(线程池管理)
private Executor executor = null;
//事件队列(可以持久化实现)
private IQueue<IEvent> queue = null;
//事件处理者
private IEventHandler eventHandler = null;
//事件路由处理
private IEventRouter eventRouter = null;
//事件处理主线程
private EventHandelMainThread eventHandelMainThread = null;
// 空闲间隔时间,默认一秒
private int idleTime = 1000;
private boolean isStarted = false;
/**
*
* 构造方法
*
*/
public StagedContainer(String id) {
if (id == null || id.trim().length() == 0) {
throw new IllegalArgumentException("StagedContainerId is null!");
}
this.id = id;
}
public String getId() {
return id;
}
protected Executor getExecutor() {
return executor;
}
protected IQueue<IEvent> getQueue() {
return queue;
}
protected void setExecutor(Executor executor) {
if (executor == null) {
throw new IllegalArgumentException("executor is null!");
}
this.executor = executor;
}
protected void setQueue(IQueue<IEvent> queue) {
if (queue == null) {
throw new IllegalArgumentException("queue is null!");
}
this.queue = queue;
}
public IEventHandler getEventHandler() {
return eventHandler;
}
public void setEventHandler(IEventHandler eventHandler) {
if (eventHandler == null) {
throw new IllegalArgumentException("eventHandler is null!");
}
this.eventHandler = eventHandler;
}
public IEventRouter getEventRouter() {
return eventRouter;
}
public void setEventRouter(IEventRouter eventRouter) {
if (eventRouter == null) {
throw new IllegalArgumentException("eventRouter is null!");
}
this.eventRouter = eventRouter;
}
public int getIdleTime() {
return idleTime;
}
public void setIdleTime(int idleTime) {
if (idleTime <= 0) {
throw new IllegalArgumentException("IdleTime is not less than zero!");
}
this.idleTime = idleTime;
}
// 发送事件
public void sendEvent(IEvent e) {
if (!isStarted) {
throw new IllegalStateException("StagedContianer has not yet started!");
}
if (e == null) {
throw new IllegalArgumentException("event is null!");
}
getQueue().offer(e);
}
public void start() {
if (queue == null) {
//默认没有持久化
queue = new PersistenceQueue<IEvent>(3000, new DirPersistence<IEvent>("d:/test/queue"));
}
queue.start();
if (executor == null) {
//默认是10个线程的定长线程池
executor = Executors.newFixedThreadPool(10, new ThreadFactoryWithName("StagedContainer:" + getId()));
}
eventHandelMainThread = new EventHandelMainThread(getId(), getQueue(), getExecutor(),
new IEventHandler() {
public void handleEvent(IEvent event) {
String eventId = event.getId();
//取消事件处理
if (CancelEventCache.containCancelEvent(eventId)) {
CancelEventCache.removeCancelEvent(eventId);
log.warn("Event[{0}] is cancel!", new Object[]{eventId});
return;
}
try {
//事件处理
IEventHandler eventHandler = getEventHandler();
if (eventHandler != null) {
eventHandler.handleEvent(event);
}
} finally {
//路由处理
IEventRouter eventRouter = getEventRouter();
if (eventRouter != null) {
eventRouter.route(event);
}
}
}
}, idleTime);
eventHandelMainThread.start();
isStarted = true;
}
public void stop() {
if (executor != null) {
//停止线程执行
if (executor instanceof ExecutorService) {
ExecutorService es = (ExecutorService) executor;
try {
es.shutdownNow();
} catch (Exception e) {
try {
es.shutdown();
}catch (Exception ignore) {
}
}
}
}
eventHandelMainThread.shutdownThread();
if (queue != null) {
queue.stop();
}
executor = null;
eventHandelMainThread = null;
queue = null;
isStarted = false;
}
// 事件处理主线程
static class EventHandelMainThread extends Thread {
private boolean isShutdown = false;
private IQueue<IEvent> eventQueue = null;
private Executor executor = null;
private IEventHandler eventHandler = null;
private int idleTime;
public EventHandelMainThread(String name, IQueue<IEvent> eventQueue, Executor executor, IEventHandler eventHandler, int idleTime) {
super(name);
this.eventQueue = eventQueue;
this.executor = executor;
this.eventHandler = eventHandler;
this.idleTime = idleTime;
}
@Override
public void run() {
while (!isShutdown) {
final IEvent event = eventQueue.poll();
//如果队列里没有事件
if (event == null) {
try {
Thread.sleep(idleTime);
} catch (InterruptedException e) {
}
continue;
}
executor.execute(new Runnable(){
public void run() {
eventHandler.handleEvent(event);
}
});
}
}
//关闭主线程
public void shutdownThread() {
isShutdown = true;
this.interrupt();
}
}
}
/**
* 取消的事件缓存
*
* @author yourname (mailto:yourname@primeton.com)
*/
public class CancelEventCache {
private static Object OBJECT = new Object();
private static ConcurrentHashMap<String, Object> eventMap = new ConcurrentHashMap<String, Object>();
public static void addCancelEvent(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return;
}
eventMap.put(eventId, OBJECT);
}
public static void removeCancelEvent(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return;
}
eventMap.remove(eventId);
}
public static boolean containCancelEvent(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return false;
}
return eventMap.containsKey(eventId);
}
}
/**
* Callback管理
*
* @author yourname (mailto:yourname@primeton.com)
*/
public class EventCallbackManager {
private static ConcurrentHashMap<String, IEventCallback> conMap = new ConcurrentHashMap<String, IEventCallback>();
public static IEventCallback getEventCallback(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return null;
}
IEventCallback callback = conMap.get(eventId);
conMap.remove(eventId);
return callback;
}
public static void register(String eventId, IEventCallback callback) {
if (eventId == null || eventId.trim().length() == 0 || callback == null) {
return;
}
conMap.put(eventId, callback);
}
public static void clear() {
conMap.clear();
}
}
/**
* 可以指定名称的线程工厂
*
* @author yourname (mailto:yourname@primeton.com)
*/
public class ThreadFactoryWithName implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
final boolean isDaemon;
public ThreadFactoryWithName(String name) {
this(name, false);
}
public ThreadFactoryWithName(String name, boolean isDaemon) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name == null ? "Seda-Default" : name + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
this.isDaemon = isDaemon;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(isDaemon);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
分享到:
相关推荐
分析SEDA框架的源码可以帮助我们深入理解其实现细节,包括事件的调度机制、阶段间的通信方式以及并发控制策略的具体实现。 8. **工具支持**: 在实际开发中,可能会用到一些工具来辅助构建和调试基于SEDA的应用,...
SEDA模型的核心思想是将复杂的并发操作分解为一系列简单的、可串行化的阶段,每个阶段负责处理特定的任务,并通过事件触发机制来推进整个处理流程。 #### 三、SEDA模型应用于ESB的设计 ##### 3.1 需求分析 在高...
### 基于SEDA的企业服务总线的设计与实现 #### 概述 本文主要探讨了如何通过采用阶段事件驱动架构(SEDA)来优化企业服务总线(ESB)的性能,特别是在高并发请求场景下。随着面向服务架构(SOA)在企业级软件开发...
**SEDA(Staged Event Driven Architecture)框架详解** SEDA,全称为Staged Event Driven Architecture,是一种在高性能计算...对于需要处理大规模并发请求的现代互联网服务来说,理解和应用SEDA框架是非常有价值的。
而Twitter的Finagle框架进一步发展了这一概念,通过提供灵活的、可扩展的服务架构,实现了更高效的并发处理。 SEDA的核心思想是将服务分解为多个独立的阶段,并在这些阶段之间使用队列作为缓冲。这样做的好处包括:...
6. **高度可伸缩性**:基于SEDA(Staged Event-Driven Architecture)模型,实现高性能和高可扩展性。 7. **基于EIP的路由机制**:采用企业集成模式(EIP),实现复杂的消息路由和处理。 #### 二、Mule ESB的整体...
该框架提供了最核心的构建块的实现,比如Aggregate、Repository和Event。此外,该框架支持与Spring集成,提供使用annotation的方式让开发人员方便地注册事件及定义事件处理,使用Spring事务管理器管理事务时,支持...
唤醒Wake是一个事件驱动的框架,基于SEDA,Click,Akka和Rx的思想。 从某种意义上说,它是通用的,旨在支持计算密集型应用程序以及高性能网络,存储和旧版I / O系统。 我们实现了Wake以支持高性能,可扩展的分析处理...
该框架利用Java NIO来实现Reactor模式,这种模式通过同步等待多个I/O事件的发生,然后通过多路复用机制将这些事件分发给相应的处理线程,从而实现了高效的I/O操作。 - **Reactor模型**:Netty采用的Reactor模型能够...
Mule ESB的处理模型基于SEDA(Staged Event-Driven Architecture),这使得它具有高度的可伸缩性,适合处理大规模的并发事件。同时,Mule ESB拥有强大的基于Enterprise Integration Patterns(EIP)的路由机制,能够...
4. 基础功能实现包括INI文件配置解析、IO操作、互斥锁管理、字符串处理、日志系统、MD5加密、随机数生成、正则表达式匹配、指标报告、内存池管理、路径处理、PID文件管理、进程管理、信号处理、SEDA框架、定时器管理...
目前市场上有多种 ESB 产品,包括 Oracle 的商业 ESB 产品、Progress 的商业 ESB 产品、TIBCO 的商业 ESB 产品、Mule 的开源 ESB 框架、WSO2 的开源 ESB 框架等。这些产品都提供了基本的 ESB 功能,包括消息传递、...
1. 它是面向服务的框架实现,与操作系统和编程语言无关。 2. 使用XML作为通信标准,支持Web服务标准。 3. 支持多种消息传递模式,如同步、异步、点对点和发布-订阅。 4. 包含适配器来集成传统系统。 5. 提供服务编制...
7. **基于SEDA(Staged Event-Driven Architecture)的处理模型**:这使得Mule能够处理高并发和大规模的事务,确保系统的可伸缩性。 8. **基于EIP(Enterprise Integration Patterns)的事件路由**:Mule ESB利用...
Mule是一个开源的ESB框架,它采用SEDA(分级事件驱动架构)模式,将请求处理过程划分为多个阶段,实现资源的有效分配和异步通信。Mule ESB提供了一个服务注册在总线上的模式,服务之间相互独立,只关注自身处理的...
1. **外围方案缺失**:Spring Boot本身并不包含服务注册与发现、负载均衡等功能,这些通常需要借助其他工具或框架(如Eureka、Zuul等)来实现。 2. **安全策略不完善**:对于安全性要求较高的场景,可能需要额外的...
- **线程模型**:MINA提供了灵活的线程模型,可以根据应用场景的需求选择单线程模式、线程池模式或多个线程池模式(SEDA)。 - **SSL支持**:MINA内置了SSL/TLS/StartTLS支持,简化了安全通信的实现过程。 - **管理...
Mule的企业集成模式(EIP)库允许开发者通过简单的组件组合来实现复杂的企业集成。 在部署方面,Mule支持热部署和热重载,这意味着可以不中断服务而更新配置和部署新代码。日志记录和异常处理也是Mule框架的重要组成...
- **线程模型**:支持单线程、线程池及基于阶段驱动的多线程池(SEDA),可根据实际应用场景选择最适合的模式。 - **安全支持**:内置SSL/TLS/StartTLS支持,简化安全通信的实现过程。 - **测试友好**:提供模拟对象...