概述
最近研究了ESB,重点研究了Mule的实现。现在分享一下学习的结果,也备自己查看。
Mule的简介就不多说了,有社区版和企业版两个版本,我们自己研究就是用社区版了,重点关注的是Mule的控制台实现。
前期准备
本文介绍知识需要基于对Mule和ESB有基本的认知,关于Mule的基本实现,以及源码的解读不在本文范畴,如果有需要,另外开一篇博客介绍。
目标
在准备将Mule移植到我们的项目上时,发现了一个问题,就是控制台的实现,具体的是对每个服务的起停操作,流量统计/清空,log记录等功能实现。
其中log功能可以使用Mule自带的Log组件实现,给特殊的功能可以使用filter来实现,本文不再赘述。
流量统计不管是写表还是记在文件中,或者开一片内存区域都可以实现,本文不再赘述。
下文重点实现对具体的服务控制的实现。
实现步骤
一 服务是什么
Mule中对服务的定义最小单元是FlowConstruct,Mule中可以对FlowConstruct进行编排,形成一套完整的服务。所以如果我们需要对具体的服务进行控制,就需要定位到具体的哪个FlowConstruct。
在看FlowConstruct之前,我们先回顾一下Mule的几大对象:
1 FlowConstruct: 就是我们说的最小的服务定义单元
2 Model:在 FlowConstruct之前,定义最小的服务模块,现在已经很少用
3 Agent:代理,用来代理服务
4 Connector:各种协议的连接对象抽象,是我们服务定义的底层实现者
5 QueueManager:内置的通知管理器,备用JMS
6 Stoppable/Startable类:其他实现这两个接口的资源信息
FlowConstruct是一个接口,我们来看看他的接口定义:
public interface FlowConstruct extends NamedObject, LifecycleStateEnabled
{
/**
* @return The exception listener that will be used to handle exceptions that may be thrown at different
* points during the message flow defined by this construct.
*/
MessagingExceptionHandler getExceptionListener();
/**
* @return The statistics holder used by this flow construct to keep track of its activity.
*/
FlowConstructStatistics getStatistics();
/**
* @return This implementation of {@link MessageInfoMapping} used to control how Important message
* information is pulled from the current message.
*/
MessageInfoMapping getMessageInfoMapping();
/**
* @return This muleContext that this flow construct belongs to and runs in the context of.
*/
MuleContext getMuleContext();
}
既然是接口,我们操作的肯定是他的实现类,他有哪些实现类呢?(怎么传图片到这里?)
直接说了,我们常用的是实现类Flow:
public Flow extends AbstractPipeline implements MessageProcessor, StageNameSourceProvider, DynamicPipeline
public abstract class AbstractPipeline extends AbstractFlowConstruct implements Pipeline
public abstract class AbstractFlowConstruct implements FlowConstruct, Lifecycle, AnnotatedObject
public interface FlowConstruct extends NamedObject, LifecycleStateEnabled
不知道这样看官能否了解这个继承结构。
二 误区1 从服务启动入手
想要启动或者停止某个单独的服务,第一反应是想到,看看框架是怎么在启动的时候加载这些服务的,看明白启动的步骤和逻辑,应该就能知道怎么实现个别服务的启动和停止了。
服务启动逻辑/停止逻辑
MuleContextLifecycleManager muleContext.getRegistry().fireLifecycle(phaseName);
MuleRegistryHelper fireLifecycle(String phase);
DefaultRegistryBroker(AbstractRegistryBroker fireLifecycle(String phase));
RegistryBrokerLifecycleManager(RegistryLifecycleManager)
fireLifecycle(String destinationPhase);
invokePhase(String phase, Object object, LifecycleCallback callback);
内部类:RegistryLifecycleCallback onTransition()
Collection<?> targetsObj = getLifecycleObject().lookupObjectsForLifecycle(lo.getType());
这里的lookupObjectsForLifecycle(lo.getType())方法会把Mule注册的所有类型都遍历一边,根据上面类聚的几大类对象,根据类型找到以后再次遍历。
最后会在查找到的所有资源中循环调用 phase.applyLifecycle(o);方法。通过这一连串的代码跳转,相信看官也跟我一样云里雾里的找不清方向,我具体解释一下Stop的逻辑,start的逻辑跟他是一样的,得益于状态模式。
Stop逻辑
首先查找到的是FlowConstruct类型的对象:
Stop的时候,实际上调用的是AbstractFlowConstruct.stop();方法,具体的stop方法被其子类AbstractPipeline复写了。
1 首先stop message source:
在DefaultInboundEndpoint. stop()中, getConnector().unregisterListener(this, flowConstruct);,即已经将对象中的connector上的监听解除了。具体方法在 AbstractConnector.unregisterListener()中。
2 Stop MessageProcessor:
从上面的结构中可以看出来,processor有很多个,只要实现了Stoppable接口的,逐个调用Stop()方法。
3 Stop MessageProcessorPollingMessageReceiver,基本没有polling的。
4 回到pipeline 类中,调用stopIfStoppable(pipeline):
实际上是stop InterceptingChainLifecycleWrapper。同样从上面的对象中可以看到,wrapper有很多个,只要实现stoppable接口的都会调用,重点关注CxfInboundMessageProcessor对象的stop,因为他代理了org.apache.cxf.endpoint.ServerImpl,实际上是调用server的stop方法。至此,CXF实际上已经停止了,对于其他类型的协议,到这里也应该就停止了。
5 回到pipeline 类中,调用doStop方法:可惜是个空方法,我操!
6 回到外围基类时,调用DefaultMessagingExceptionStrategy的stop。结束后完成所有stop操作。
其次查找到connector类型的对象:
根据前面的分析,在停止Flow的时候,实际上已经停了相应的connector,那这里的停止是做什么事情呢?
同样,先来看看connector对象的结构,从分析flow 中可以看出,这个对象异常复杂,因为他需要封装所有的连接协议,我们看看他的类继承关系图吧:(不好意思,不会贴图)
正式停止connector
停止从AbstractConnector.stop()中开始,在onTransition回调函数中:
1 shutdownScheduler():
调用ScheduledThreadPoolExecutor.stop to Disable new tasks from being submitted。等一段时间后,彻底关闭ScheduleExecutor shutdownNow。
2 doStop():
到子类HttpConnector中HttpConnectionManager. Dispose();清除socket数据,并且调用MuleWorkManager.dispose();
MuleWorkManager 中关闭ThreadPoolExecutor.shutdown(); to Disable new tasks from being submitted,然后等一段时间后彻底关闭Executor:workExecutorService.shutdownNow()。
3 disposeWorkManagers:
清除dispatcherWorkManager,requesterWorkManager,receiverWorkManager中的所有workManager。
4 清除transport中的Dispatchers和Requesters
DefaultConfigurableKeyedObjectPool和GenericKeyedObjectPool
接着查找所有实现stoppable接口的资源:
前面已经提到很多资源使用stoppable接口停止了,比如MessageProcessor,InterceptingChainLifecycleWrapper等,在最后,Mule会把所以资源统一梳理一遍,再做一次Dispose。
1 NotificationLifecycleObject
DefaultInboundEndpoint:在flow中已经stop过endpoint了吗?答曰,正解。在stop message source的时候已经通过endpoint找到connector stop了所有的endpoint 监听了。看看这里做的是什么操作。直接忽略了,我操!
DefaultProcessingTimeWatcher:清除守护线程?watcherThread:ProcessingTimeChecker
申明:忽略了两个选项,一个是MessageQ,还有一个是modle(已经基本不用了):
至此,stop的操作就结束了,这个stop不是将将框架停止,不是MuleServer的stop,只是停止了所有的服务,没有调用框架的Dispose方法,所以属于轻量的停止。
重启所有的服务,基本上是按照stop的流程,将所有的资源重新启动一遍,这里就可以看出来状态模式的好处,基本 不用太多的特殊处理。
问题:可否实现单个服务的起停?
单个服务是什么?FlowConstruct。与之对应的是connector,如果能做到将FlowConstruct和对应的connector起停了,是否可以实现对单个服务的起停?从Mule的框架来看是不具备这样的设计的,他的所有通知都是基于所有的类型。
结论:看了这么多代码,其实发现还是不能实现从FlowConstruct找到对应的所有资源,比如connector,还有workManager等等。
三 误区2 从registry入手
先解释一下registry的定义:registry是注册管理器,Mule用来注册所有的资源,Mule在context中通过registry broker来维护spring registry,transient registry,guice registry。
spring registry的功能是维护通过spring注入的所有实例;
transient registry的功能是维护默认的处理函数链表_muleContextProcessor,_muleExpressionEvaluatorProcessor,_muleExpressionEnricherProcessor,_muleLifecycleStateInjectorProcessor,_muleLifecycleManager
还有mule配置注入的connector,endpoint等对象,对象在创建时会主动注册其监听的事件。
guice registry维护通过guice框架注入的对象。
mule访问registry中具体对象的流程为:
muleContext->muleRegistry(muleRegistryHelper)-->DefaultRegistryBroker-->registry array--> registry --> object。
目前在spring启动的方式下,就包括了spring 和transient 两个registry。
在框架refresh之后,调用firelifecycle,进入RegistryLifecycleManager,对启动阶段的模块调用start进行初始化操作。
从这里分析,我们是否可以从registry入手来移除注册的服务呢?这个看似可行的逻辑却犯了原则行的错误。
很想把registryBroker和几个manager的类图贴上来,很清晰的表达了几个对象之间的关系,很抱歉,大家自己去看类图,这里大家看一下框架是怎么实现注册的。
public void registerObject(String key, Object value, Object metadata) throws RegistrationException
{
Iterator it = getRegistries().iterator();
Registry reg;
while (it.hasNext())
{
reg = (Registry) it.next();//第一次取到的是TransientRegistry
if (!reg.isReadOnly())
{
reg.registerObject(key, value, metadata);
break;
}
}
}
最终的结果就final Object previousObject = registry.put(key, object);
对于注册机制的几点疑问:
1 注册transformer时默认将所有transformer都注册了,一共31个。是否可以简化之?
2 注册registerObjects时默认注册了所有的Parser,约20个,是否可以简化之?
根据上面解释的,spring启动的具体的对象都是注册在spring registry中,问题就处在:
TransientRegistry底层就是维护一个map,可以通过查找的方式unregistry。
SpringRegistry底层是通过application查找维护的bean对象的。
但是只读的,不可以通过lookup的方式查找之后去unregistry,他妈的是只读的!
总结:好了,到这里可以看出来这条路是走不下去的,根本问题是registry注册的是框架上的注册,而我们要起停的服务,不是从框架上将其移除,只是为了暂停运行而已,说白了,可能就是把connector关了,相关资源释放了,然后可以重启就OK了!
四 stopMessageProcessing的启发
Mule ESB的消息在流服务中传递时如果发生异常,Mule ESB服务器会将异常信息输出到本地日志文件中,如果希望捕获该异常进行处理,可以在Flow中编排异常策。
在异常策略节点内可编排一个出口端点,Mule ESB将把异常错误信息以数据传给该出口端点(数据类型为org.mule.message.ExceptionMessage)。缺省的异常处理策略节点具有以下两个属性:
1 enableNotifications 默认为true,是否触发异常处理策略
2
stopMessageProcessing 默认为flase,是否停止flow服务,如果为true则流程服务将被停止,服务器重启前不能再相应请求。
我操啊,这个正是我想要的结果,这个尼玛的踏破铁鞋无觅处啊!来看看源码中的实现:
protected void stopFlow(FlowConstruct flow)
{
if (flow instanceof Stoppable)
{
logger.info("Stopping flow '" + flow.getName() + "' due to exception");
try
{
((Lifecycle) flow).stop();
}
catch (MuleException e)
{
logger.error("Unable to stop flow '" + flow.getName() + "'", e);
}
}
else
{
logger.warn("Flow is not stoppable");
}
}
这样就简单了,不用细说了,自己把代码挖过来,直接实现了这个效果。
Mule中有很多这样风格的代码:
if (flow [color=red]instanceof Stoppable[/color])
{
try
{
([color=red](Lifecycle)[/color] flow).stop();
}
catch (MuleException e)
{
...
}
}
值得学习一下。
五 总结
至此把mule的源码翻了一遍,比较奇怪的是,为什么Mule不提供一个这样的控制台效果的功能,另外对于Mule的源码还需要继续挖,还有不少有疑问的地方,今天就不写了,已经很长了。
学会插图片了,以后再改吧!
- 大小: 47.3 KB
分享到:
相关推荐
《深入解析Mule ESB源码》 Mule ESB(Enterprise Service Bus,企业服务总线)是一款开源的集成平台,旨在简化企业级应用之间的数据交互。本文将围绕Mule ESB的源码进行深入探讨,揭示其核心设计理念与工作原理。 ...
Mule是一个基于Java的集成平台,提供了一个灵活、可扩展的架构来集成各种应用程序和系统。在本文档中,我们将详细介绍如何从Mule的源代码中编译出Eclipse项目,并将其发布到服务器上。 Mule源码编译 首先,我们...
2. **流处理**:Mule ESB基于“数据流”的设计理念,通过定义数据在各个组件之间的流动路径,实现数据的高效处理。 3. **预建连接器**:Mule提供了大量预建的连接器,可以轻松连接到各种外部系统,如数据库、Web服务...
随着对Mule ESB的深入学习,你可以利用它的强大功能,实现更复杂的企业级集成方案,如数据转换、错误处理、安全策略等,以满足各种业务需求。Mule ESB的灵活性和易用性使其成为集成解决方案的理想选择,尤其适合处理...
Mule 是一个基于ESB架构理念的消息平台。Mule 的核心是一个基于SEDA的服务容器,该容器管理被称为通用消息对象(Universal Message Objects /UMO)的服务对象,而这些对象都是POJO。所有UMO和其他应用之间的通信都是...
《Mule ESB Cookbook随书源码》是一个与Mule ESB相关的实践指南,它包含了大量实例代码,旨在帮助读者深入理解和应用Mule ESB这一开源企业服务总线(Enterprise Service Bus)。Mule ESB是业界广泛采用的ESB解决方案...
"Mule实现路由分发"这一主题涉及到Mule如何处理消息的传递和定向,这在分布式系统和微服务架构中是至关重要的。 路由分发是Mule的核心功能之一,它确保消息被正确地发送到目标系统或服务。通过Java代码实现路由分发...
1. **Mule ESB简介**:Mule ESB是一个基于Java的轻量级服务总线,它提供了一种简单的方法来连接各种系统、应用和云服务。其设计目标是简化企业应用之间的数据交换和流程整合,支持多种协议和数据格式。 2. **核心...
这将有助于你在开发过程中解决更复杂的问题,或者利用Mule2的强大功能实现自定义需求。 在进行源代码编译时,不断学习和实践是关键。你可以参考Mule2的官方文档、社区论坛和博客(如文中提到的ITEYE博客)来获取更...
在IT行业中,构建高效、可扩展的企业级应用是至关重要的,而Mule ESB(企业服务总线)和Apache CXF则是实现这一目标的两大关键工具。本文将深入探讨如何利用Mule服务总线代理Apache CXF服务源码,帮助开发者更好地...
Mule基于Enterprise Service Bus(ESB)架构思想。ESB的主要特性是通过扮演一个中转系统的角色,允许不同的应用系统交互,中转系统在内网或Internet上的应用系统间搬运数据。 目前市场上有一些商业的ESB实现。尽管...
在Mule项目中,需要配置stdioconnector,以便在控制台中输入和输出信息。同时,需要配置vmconnector,以便在虚拟机中执行Mule项目。 在部署Mule项目时,需要将项目打包成一个zip文件,并将其部署到Mule服务器上。在...
在IT行业中,Mule ESB(企业服务总线)是一个流行的开源集成平台,用于构建连接应用程序和服务的系统。本文将详细介绍如何在Apache Tomcat服务器上部署Mule项目,这是一个常见的需求,因为Tomcat是一个广泛使用的轻...
Mule ESB 是一个轻量级的基于java的企业服务总线和集成平台, 使得开发人员可以快速,简单的连接多个应用, 使得它们可以交换数据。 Mule ESB 容易集成现有异构系统,包括:JMS, Web Services, JDBC, HTTP, 等. ESB...
Mule ESB基于Java,但它不是传统的Java EE应用服务器,而是采用了更轻量级的Java运行时环境,如Java Management Extensions (JMX) 和Java Naming and Directory Interface (JNDI)。这种设计使得Mule可以在较低的硬件...
1. **MULE ESB架构**:MULE ESB基于事件驱动的架构,允许异步处理和高吞吐量。它支持多种协议和数据格式,如HTTP、JMS、FTP等,以及XML、JSON等数据交换格式。 2. **AnyPoint Studio**:这是Mule应用的主要开发工具...
**ESB(企业服务总线)** ...总之,"esb-mule系统设计"的学习资料涵盖了ESB的基本概念、MULE ESB的特性和使用、SOA架构的优势,以及MULE与SPRING的整合。通过深入学习和实践,你将能够构建高效的企业服务集成解决方案。