本文的目的是在 broker 端实现消息的路由分发,通俗点讲就是,根据消息的特征将消息分发到不同的 queue 或者 topic 上。要实现消息路由,最简单的方式是在 activemq 提供的 xml 配置文件下面构建路由规则。
所使用的版本:
- ActiveMQ 5.6.0
- Camel 2.9.2
在 ActiveMQ 的每个发行版的 conf 目录下包含了很多的示例 xml 配置文件,它们是对 ActiveMQ 重要 feature 的配置举例,比如使用jdbc作持久化机制时针对不同数据库的配置、如何进行权限配置、如何配置 network broker 等等。在研究ActiveMQ的某项特征时,参考下这里面的配置会节省很多时间。本文将要实现的功能用到了 conf 目录下的 activemq.xml 和 camel.xml 两个文件。
activemq.xml 是启动 broker 时缺省的配置文件。如果要使用 camel,我们首先需要将 activemq.xml 中的 “import camel.xml” 的那行注释去掉。
<import resource="camel.xml"/>
camel.xml 中的 camelContext 中包含了一个简单的路由规则:
<route> <description>Example Camel Route</description> <from uri="activemq:example.A"/> <to uri="activemq:example.B"/> </route>
其中 uri 的格式为 activemq:[queue:|topic:]destinationName 在不指定 queue/topic 的情况下,默认为 queue。
它实现的功能很简单:所有发送到队列 example.A 上的消息都将转发到队列 example.B。
camel允许我们在activemq的broker端根据Message的特征实现路由分发。为此,我们需要一种可以描述路由规则的语言,而camel支持很多这样的语言(http://camel.apache.org/languages.html)。我选用了 Bean language,这种语言允许我们通过在bean中定义的过滤方法(返回类型为boolean)对消息进行过滤。并且,可以用spring下bean的定义方式,在xml文件下注册这个bean。
import javax.jms.JMSException; import org.apache.camel.Body; import org.apache.camel.Header; public class CamelBean { public boolean isEven(@Header("JMSType")String jmsType) throws JMSException { return "even".equals(jmsType); } public boolean isOdd(@Header("JMSType")String jmsType) throws JMSException { return "odd".equals(jmsType); } public boolean matchBody(@Body String body) { if (body.contains("aaa")) return true; return false; } }
我在CamelBean中示例了三种过滤方法。 isEven和isOdd都是通过消息头的JMSType作判断的,而matchBody方法则是检查实际的消息体中是否包含“aaa”关键字。
为了使用这个bean,我需要将该类打到jar包里,然后放到activemq主目录的lib路径下面。
下一步要做的就是在camel.xml文件下注册这个bean。
<bean id="camelBean" class="com.example.activemq.CamelBean"></bean>
最后,也是最关键的部分,即将这个bean与camel的路由机制关联。为此我们需要在 route下面使用 filter。配置如下:
<route> <description>Example Camel Route</description> <from uri="activemq:example.A"/> <filter> <method ref="camelBean" method="isEven" /> <to uri="activemq:example.even"/> </filter> <filter> <method ref="camelBean" method="isOdd" /> <to uri="activemq:example.odd"/> </filter> <filter> <method ref="camelBean" method="matchBody" /> <to uri="activemq:example.aaa"/> </filter> </route>
这段配置实现的功能是:
检查消息头的JMSType值,如果是“even”就将消息的一个副本发送到example.even队列(odd,情况同理);如果检查消息内容发现“aaa”关键字,则发送一个消息副本到example.aaa队列。注意,同一份消息是可能会被分发到多个队列的。
这种配置下,每个消息到达broker后,broker端要做多次检查,次数取决于filter的个数。因此,如果filter很多,而且broker接收到的消息量很大的话,性能上会有一定的影响。
附:创建并发送消息的部分示例代码:
private void sendMessage() { try { Queue queue = session.createQueue("example.A"); final MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage("aaabbbbb"); if (ai.incrementAndGet() %2 == 0) { message.setJMSType("even"); } else { message.setJMSType("odd"); } producer.send(message); producer.close(); // session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
相关推荐
在这个项目中,源代码可能会有Spring的配置文件(如applicationContext.xml),定义Camel路由的XML或Java配置,以及实现JMS消费者和生产者的类。 7. **target**:Maven构建的输出目录,包含编译后的class文件、打包...
这个Demo是系列的第二部分,它建立在上一部分的基础之上,深入介绍了如何实际操作这些组件以实现消息传递。 **ActiveMQ**: ActiveMQ是Apache出品的一个开源、跨平台的消息中间件,它支持多种协议如OpenWire、AMQP、...
ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 使用Camel Choice进行配置...
- **消息持久化**:了解ActiveMQ如何实现消息的持久化,确保在服务器故障后能够恢复未处理的消息。 - **监控和调试**:掌握如何监控Camel路由的运行状态,以及使用日志和Camel的调试工具进行问题排查。 通过深入...
消息队列组件选用Apache ActiveMQ,用于存储和提供主题消息,能够实现消息的解耦生产和消费,提高系统的并发能力和扩展性。消息队列组件还具有缓存作用,可以在系统负载过高时“削峰填谷”,保证消息处理的连续性。 ...
Apache Camel 框架实现的 JMS 路由是指使用 Apache Camel 框架来实现 Java 消息服务(JMS)的路由。JMS 是一个 Java API,用于在 Java 应用程序之间进行消息传递。Apache Camel 框架是一个开源的集成框架,提供了一...
- **消息路由**:通过集成Apache Camel等工具,ActiveMQ支持复杂的路由规则,使得消息可以根据特定条件被路由到不同的目的地。 - **事务支持**:支持XA事务,确保消息处理的一致性和完整性。 - **集群支持**:支持多...
5. **测试和调试** - 编写测试用例验证Camel路由和ActiveMQ的正确性,确保消息的正确发送和接收。 **应用示例:** 假设我们有一个需求,即从一个REST API接收数据,然后将这些数据发送到ActiveMQ队列,供其他系统...
1. **ActiveMQ Camel Tomcat 示例**:此示例展示了如何在运行于Apache Tomcat的应用程序中嵌入Apache ActiveMQ和Camel,提供了一个将消息系统与Web应用结合的例子。 2. **Aggregate 示例**:展示的是Camel 2.3中的...
Camel 框架的核心是一个路由引擎,它允许你定义自己的路由规则,决定接受哪些消息,做出决定如何处理,发送这些消息给其他目标。Camel 用这种集成语言允许你定义复杂的路由规则。Camel 的基本原则之一是不会假设任何...
8. **事件驱动架构**:Apache Camel可以通过事件驱动的方式工作,结合消息队列(如ActiveMQ)实现解耦和扩展性。 9. **API Gateway**:Camel可以被用作API网关,对外提供统一的接口,内部处理复杂的路由逻辑,实现...
例如,开发者可以通过Spring管理Camel路由的生命周期,使用Spring的事务管理来控制消息处理的事务性。 此外,Camel提供了多种测试工具,开发者可以在隔离的测试环境中验证路由的逻辑。这包括模拟组件、处理跟踪等...
8. **扩展性**:ActiveMQ可以与其他Apache项目如Camel和CXF集成,实现更复杂的消息路由和转换功能。 9. **跨语言支持**:ActiveMQ的多协议支持使得它能够与各种编程语言如Python、Ruby、C#等进行无缝集成。 10. **...
- Camel:Apache Camel是一个企业级集成库,可以与ActiveMQ配合实现复杂的消息路由和转换。 7. **最佳实践与案例分析** - 消息确认:理解如何正确配置消息确认机制,防止消息丢失。 - 性能调优:探讨如何调整...
9. **camel-core-1.2.0.jar**:Apache Camel的核心库,它是一个集成框架,用于构建复杂的消息路由和转换逻辑。 10. **spring-2.0.6.jar**:Spring框架的核心库,提供依赖注入、AOP、事务管理等功能,是Java EE应用...
Apache Camel是一个基于规则路由和中介引擎,提供企业集成模式的Java对象(POJO)的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你...
3. **Fuse ESB**:Fuse ESB是基于OSGi框架的轻量级ESB,它集成了多个Apache项目,如Camel(路由引擎)、 CXF(Web服务框架)和Karaf(OSGi容器)。Fuse ESB提供了服务的部署、管理和监控功能,帮助企业构建灵活、可...