`
longgangbai
  • 浏览: 7339144 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ的拦截器插件

阅读更多

ActiveMQ拦截器使用和原理

     ActiveMQ中使用拦截器和过滤器的使用多采用插件的形式实现,继承BrokerFilter实现BrokerPlugin接口类。BrokerFilter实质一个实现Broker接口的类。

publicinterface BrokerPlugin {

    /**

     *Installsthepluginintotheinterceptorchainofthebroker,returningthenew

     *interceptedbrokertouse.

     */

    Broker installPlugin(Broker broker) throws Exception;

}

 

1.       日志拦截器

      日志拦截器是Broker的一个拦截器,默认的日志级别为INFO。你如你想改变日志的级别。这个日志拦截器支持Commons-logLog4j两种日志。

Attribute

Description

Default Value

logAll

Log all Events

false

logMessageEvents

Events related with producing, consuming and dispatching messages

false

logConnectionEvents

Events related to connections and sessions

true

logTransactionEvents

Events related to transaction handling

false

logConsumerEvents

Events related to consuming messages

false

logProducerEvents

Events related to producing messages

false

logInternalEvents

Events normally not of Interest for users like failover, querying internal objects etc

false

默认连接时间日志是开启,其他均未开启。通过activemq开启相关的日志。

    <plugins>

      <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->

      <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>

    </plugins>

2.       统计拦截器

     ActiveMQ5.3开始,StatisticsPlugin插件被用作检测Broker中统计的插件。注意消息必须包含replyTo的消息头,如果是在JMS那么需要采用jmsReplyTo消息头,否则消息将被统计忽略。ReplyTo消息头包含了你想检查统计的消息。统计消息是一个MapMessage.

      检查Broker的信息,仅仅需要一个名称为ActiveMQ.Statistics.Broker并且有一个replyTo的消息头的Destination。为了检测所有destination,你需要一个名称为ActiveMQ.Statistics.Destination.<destination-name>或者ActiveMQ.Statistics.Destination.<wildcard-expression>并且有一个replyTo的消息头。如果许多Destination匹配相关的模糊匹配表达式,那么统计的消息将被发送到replyToDestination.

    <plugins>

      <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->

      <statisticsBrokerPlugin/>

</plugins>

Statistics插件发送消息到特殊的目标。

下面是一个统计Broker的消息

           Queue replyTo = session.createTemporaryQueue();

           MessageConsumer consumer = session.createConsumer(replyTo);

 

           String queueName = "ActiveMQ.Statistics.Broker";

           Queue testQueue = session.createQueue(queueName);

           MessageProducer producer = session.createProducer(testQueue);

           Message msg = session.createMessage();

           msg.setJMSReplyTo(replyTo);

           producer.send(msg);

 

           MapMessage reply = (MapMessage) consumer.receive();

           assertNotNull(reply);

 

           for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {

             String name = e.nextElement().toString();

             System.out.println(name + "=" + reply.getObject(name));

        }

查询Destination的统计信息

           Queue replyTo = session.createTemporaryQueue();

           MessageConsumer consumer = session.createConsumer(replyTo);

 

           Queue testQueue = session.createQueue("TEST.FOO");

           MessageProducer producer = session.createProducer(null);

 

           String queueName = "ActiveMQ.Statistics.Destination." + testQueue.getQueueName()

           Queue query = session.createQueue(queueName);

 

           Message msg = session.createMessage();

 

           producer.send(testQueue, msg)

           msg.setJMSReplyTo(replyTo);

           producer.send(query, msg);

           MapMessage reply = (MapMessage) consumer.receive();

           assertNotNull(reply);

           assertTrue(reply.getMapNames().hasMoreElements());

                  

           for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {

               String name = e.nextElement().toString();

               System.err.println(name + "=" + reply.getObject(name));

        }

 

ActiveMQ性能优化

1、目标策略

在节点destinationPolicy配置策略,可以对单个或者所有的主题和队列进行设置,使用流量监控,当消息达到memoryLimit的时候,ActiveMQ会减慢消息的产生甚至阻塞,destinationPolicy的配置如下:

 

 

producerFlowControl表示是否监控流量,默认为true,如果设置为false,消息就会存在磁盘中以防止内存溢出;memoryLimit表示在producerFlowControl=”true”的情况下,消息存储在内存中最大量,当消息达到这个值时,ActiveMQ会减慢消息的产生甚至阻塞。policyEntry的属性参考:http://activemq.apache.org/per-destination-policies.html

当producer发送的持久化消息到达broker之后,broker首先会把它保存在持久存储中。接下来,如果发现当前有活跃的consumer,如果这个consumer消费消息的速度能跟上producer生产消息的速度,那么ActiveMQ会直接把消息传递给broker内部跟这个consumer关联的dispatch queue;如果当前没有活跃的consumer或者consumer消费消息的速度跟不上producer生产消息的速度,那么ActiveMQ会使用Pending Message Cursors保存对消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给broker内部跟这个consumer关联的dispatch queue。以下是两种Pending Message Cursors:

VM Cursor:在内存中保存消息的引用。

File Cursor:首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中。

在缺省情况下,ActiveMQ 会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。

对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有

vmDurableCursor 和 fileDurableSubscriberCursor;对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。

Message Cursors的使用参考:http://activemq.apache.org/message-cursors.html

2、存储设置

设置消息在内存、磁盘中存储的大小,配置如下:

 

 

memoryUsage表示ActiveMQ使用的内存,这个值要大于等于destinationPolicy中设置的所有队列的内存之和。

storeUsage表示持久化存储文件的大小。

tempUsage表示非持久化消息存储的临时内存大小。

分享到:
评论

相关推荐

    Active MQ 插件开发与部署

    - `BrokerFilter`:这是最基础的插件接口,它扩展了`BrokerService`,提供了拦截消息处理的能力。 - `MessageBroker`:插件通常会实现此接口,以便控制消息的生命周期。 - `NetworkConnector`、`Transport...

    ActiveMQ-Artemis .pdf

    19. **操作拦截、数据工具和Maven插件**:介绍了操作拦截以及如何使用数据工具和Maven插件进行项目管理和构建。 20. **单元测试、故障排除和性能调优**:解释了如何进行单元测试、故障排除以及性能调优。 21. **...

    activemq-parent-5.5.1-source-release

    9. **插件体系**:ActiveMQ拥有强大的插件体系,开发者可以根据需要定制各种功能,如拦截器、审计日志等。 源代码分析可以从以下几个方面入手: 1. **模块结构**:了解项目的目录结构,找到主要的组件和模块,例如...

    ActiveMQ5.4.3源码

    6. **扩展与定制**: 通过阅读源码,开发者可以根据需求自定义插件、拦截器或适配器,增强ActiveMQ的功能。 通过深入研究ActiveMQ 5.4.3的源码,开发者不仅能掌握消息中间件的工作机制,还能提升对JMS规范的理解,...

    ActiveMQ in Action

    作者还探讨了消息重发和死信队列、拦截器插件、Apache Camel路由引擎框架的使用等高级主题。 对于需要高度定制消息服务的开发者,书中还介绍了客户端的高级选项,如排他性消息消费者、消息群组、ActiveMQ流和使用二...

    一线大厂金三银四面试资料.zip

    4. MyBatis的插件机制:自定义拦截器实现SQL日志、性能分析等功能。 5. MyBatis与Spring的整合:Spring的SqlSessionFactoryBean、@Transactional注解的应用。 此外,"Java架构面试专题寒冬(含答案)和学习笔记"这...

    jfinal-1.1.3-src.zip

    1. **AOP(面向切面编程)**:JFinal通过拦截器(Interceptor)实现了AOP,使得开发者可以在不修改原有代码的情况下,对业务逻辑进行扩展和增强,提高了代码的复用性和可维护性。 2. **Model映射**:JFinal的Model...

    全新JAVAEE大神完美就业实战课程 超150G巨制课程轻松实战JAVAEE课程 就业部分.txt

    day05_CRM权限拦截器_SSH纯注解整合 day06_Easyui&列表展示 10-Oracle数据库(学习4天) Oracle_day01,安装_函数查询and条件查询 Oracle_day02,多表查询_子查询_集合运算 Oracle_day03,DDL,DML,视图,PLSQL编程 ...

    Apache Camel USER GUIDE Version 2.3.0

    7. **Interceptors (拦截器)**:拦截器可以在消息处理之前或之后执行某些操作,例如日志记录或安全检查。 8. **Errors and Exception Handling (错误与异常处理)**:Apache Camel 提供了一套完整的机制来处理运行时...

    Maven+spring+ibatis+struts2.0+MQ+Memcached 项目构建

    它提供了拦截器、结果类型、动态方法调用等功能,使得动作控制更加灵活,同时支持AJAX和RESTful风格的请求。 5. MQ(Message Queue):消息队列是一种中间件,用于在分布式系统中解耦组件间的通信。在本项目中,...

    SpringCloud组件详细介绍word文档

    - **可扩展性强**:通过自定义拦截器等方式扩展功能。 ##### 4. **Hystrix** Hystrix 是一个延迟和容错库,主要用于隔离远程系统、服务和第三方库的访问点,防止级联故障并在复杂分布式系统中实现弹性。它通过...

    JAVA常用技术下载

    相比于1.x版本,Struts2更加成熟,提供了更为丰富的功能集,如拦截器、动态方法调用等,同时支持更多的配置选项和插件。对于初学者而言,Struts2的学习曲线可能略高,但其强大的功能和社区支持使其成为许多大型项目...

    Spring in Action(第2版)中文版

    11.3.4使用注释声明拦截器 11.4小结 第12章访问企业服务 12.1从jndi中获取对象 12.1.1使用传统的jndi 12.1.2注入jndi对象 12.1.3在spring2中注入jndi对象 12.2发送电子邮件 12.2.1配置邮件发送器 12.2.2...

    Spring in Action(第二版 中文高清版).part2

    11.3.4 使用注释声明拦截器 11.4 小结 第12章 访问企业服务 12.1 从JNDI中获取对象 12.1.1 使用传统的JNDI 12.1.2 注入JNDI对象 12.1.3 在Spring 2中注入JNDI对象 12.2 发送电子邮件 12.2.1 配置邮件发送...

    Spring in Action(第二版 中文高清版).part1

    11.3.4 使用注释声明拦截器 11.4 小结 第12章 访问企业服务 12.1 从JNDI中获取对象 12.1.1 使用传统的JNDI 12.1.2 注入JNDI对象 12.1.3 在Spring 2中注入JNDI对象 12.2 发送电子邮件 12.2.1 配置邮件发送...

    Dubbo视频教程--高级篇--第24节--简易版支付系统介绍

    - **pay-common-web**: 公共Web工程,包含Web层的通用代码,如拦截器、过滤器、控制器等。 - **pay-api-merchant**: 商户API工程,提供商户端对接支付平台时需要使用的接口。 #### 3. 服务接口定义 在简易版支付...

    Java面试框架高频问题2019

    - Struts2是基于拦截器的,而SpringMVC是基于前端控制器模式。 - SpringMVC使用MVC设计模式,而Struts2则不是。 **问题六:SpringMVC怎么样设定重定向和转发的?** - 使用`return "redirect:/url"`实现重定向。 - ...

Global site tag (gtag.js) - Google Analytics