`
winnie825
  • 浏览: 120203 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ“连接池”使用

    博客分类:
  • JMS
阅读更多

1. ActiveMQ的连接池

ActiveMQ提供了PoolConnectionFactory、PoolConnection等实现连接池功能,连接池是供对connection、session、producer的“池”,PoolConnectionFactory的类注释说明的原因:

 

<b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
just created at startup and left active, handling incoming messages as they come. When a consumer is
complete, it is best to close it rather than return it to a pool for later reuse: this is because,
even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
where they'll get held until the consumer is active again.

If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html

 1)首先connection、session、producer的创建会消耗大量系统资源;

 2)其次consumer有自己的机制,第二段中url中对一个问题进行了说明,其中讲到consumer的一个特性:consumer会获取全部的消息,接收消息的多少可以根据初始化缓存的大小设置,因此在大量消息发送到消费者时,消费者使用类似统一获取、统一消费的方式处理,“池”没有存在的价值。

 

2. 实现原理

 

PoolConnectionFactory

 

  • “池”的存放

 

PoolConnection被一个Map对象存放,ConnectionKey作为该map的key,LinkedList作为连接存放的列表,也就是说获取连接时,首先会根据ConnectionKey获取对应的“小连接池”,再从“小连接池”LinkedList中去连接。

  • 工厂类的参数值

maxmunActive:session的最大活跃值,该参数会通过ConnectionPool的createSession方法如入到Session工程类中(apache-common-pool的GenericObjectPoolFactory);

 

maxConnections:Map的value,LinkedList的大小,即ConnectionPool数量;

 

idleTimeout:线程超时时间,最后使用时间+idleTimeout<当前时间,连接关闭;

 

expiryTimeout:回收时间,连接创建时间+expiryTimeout<当前时间,连接关闭;

 

 

  • 连接创建流程

1)判断工厂是否stop,如果stop,输出日志并返回null;

2)获取ConnectionKey;

3)根据ConnectionKey获取LinkedList;

4)如果LinkedList的大小==maxConnections,获取LinkedList的第一个连接;

5)校验连接,检验失败:a、创建新连接Connection b、通过连接创建ConnectionPool c、将ConnectionPool注入工厂;

6)将ConnectionPool放入LinkedList中;

 

其中创建ConnectionPool设计SessionPool的初始化。

 

3. 样例代码

 

  • 创建连接
public class MQPoolUtil {

    private static PooledConnection conn;
    
    public static void init() {
        String url = "failover:(tcp://192.168.174.250:61616)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2";
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        try {
            PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory);
            conn = (PooledConnection) poolFactory.createConnection();
            conn.start();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    public static void destroy(){
        try {
            if(conn != null) {
                conn.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    
    public static PooledConnection getConn() {
    
        return conn;
    }

    
    public static void setConn(PooledConnection conn) {
    
        MQPoolUtil.conn = conn;
    }
}

 

  • 创建session及producer
public class MQProducer extends Thread {

    public void run() {

        String topic = "MQ.TEST";
        Session session = null;
        MessageProducer producer = null;
        try {
            session = MQPoolUtil.getConn().createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createTopic(topic);
            
            System.out.println("session info ->" +session);

            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage message = session.createTextMessage("hello message!");
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

当执行多次后,从System.out.println中可以看出,sessionid可能是重复的,即缓存中获取了session。

分享到:
评论
1 楼 Java_zai 2016-02-05  
[quo     te][b][/b]

相关推荐

    自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用

    本资源提供的内容是关于ActiveMQ的连接池实现,分为两部分:一是作者自己实现的ActiveMQ连接池,二是新版本ActiveMQ自带的连接池。连接池是一种资源管理技术,通过复用已建立的数据库连接或网络连接,减少创建和销毁...

    ActiveMQ连接池完整封装实例工具类

    本文将详细介绍如何实现一个ActiveMQ连接池的完整封装实例工具类,并探讨其背后的设计思想。 首先,我们需要了解JMS(Java Message Service)接口,它是Java平台中用于创建、发送、接收和读取消息的标准API。...

    Spring Boot ActiveMQ连接池配置过程解析

    Spring Boot ActiveMQ连接池配置过程解析 Spring Boot ActiveMQ连接池配置过程解析 在本文中,我们将详细介绍Spring Boot ActiveMQ连接池配置过程解析。该配置过程主要解决了Spring Boot ActiveMQ连接池中的频繁...

    ActiveMQ整合Spring使用连接池

    接下来,配置ActiveMQ连接工厂。在Spring的XML配置文件中,你可以定义一个`ConnectionFactory` bean,这将使用ActiveMQ的TCP连接工厂。示例配置如下: ```xml &lt;bean id="connectionFactory" class="org.apache....

    ActiveMQ相关jar包--使用Connection连接池

    以下是使用ActiveMQ连接池的基本步骤: 1. 引入依赖:确保在项目中添加了ActiveMQ和Commons Pool的正确jar包。从描述中可以看出,jar包的版本选择非常重要,因为不同的版本可能有不同的API和特性。通常,最新稳定...

    SpringBoot集成ActiveMQ实例详解.docx

    如果需要使用连接池,还需要引入`activemq-pool`依赖: ```xml &lt;groupId&gt;org.apache.activemq &lt;artifactId&gt;activemq-pool ``` 接着,在`application.properties`中进行配置。Spring Boot提供了两种配置方式:...

    ActiveMQ开发规范及方案

    连接池使用 连接池是指在应用程序中预先创建的一组连接,以便在需要时可以快速地获取连接。使用连接池可以提高系统的性能和可靠性。ActiveMQ提供了多种连接池实现,例如Apache Commons Pool、C3P0等。 消费者监控 ...

    activemq-pool.jar.zip

    同时,它也可以与其他Java应用程序集成,比如Spring框架,通过Spring的配置方式来管理和使用ActiveMQ连接池。 在下载`activemq-pool.jar.zip`包后,开发者需要将`activemq-pool.jar`添加到项目的类路径中,并根据...

    activemq-pool源码

    在ActiveMQ中,`activemq-pool`模块扮演着关键角色,它为ActiveMQ客户端提供连接池服务,有效管理和复用JMS(Java Message Service)连接,从而提高系统性能并减少资源消耗。本文将深入探讨`activemq-pool`的源代码...

    activemq-pool-5.4.1.jar.zip

    在ActiveMQ的生态系统中,ActiveMQ Pool是一个关键组件,它提供了一种管理JMS连接池的机制,以优化资源使用和提高系统性能。本文将深入探讨ActiveMQ Pool 5.4.1版本及其包含的JAR文件。 首先,ActiveMQ Pool是一个...

    ActiveMQ高并发处理方案

    本文将详细介绍ActiveMQ在高并发环境下的优化策略,包括异常处理、连接池使用、消费者公平调度以及系统整体扩展等方面。 #### 二、高并发发送消息异常及其解决 ##### 现象描述 当使用多个线程(如10个)以一定频率...

    activeMQ使用JDBC所需要的jar包

    总结来说,使用ActiveMQ的JDBC持久化需要合适的jar包支持,包括ActiveMQ自身的JDBC相关库、数据库驱动和连接池库。正确配置这些组件并设置好数据源,才能确保ActiveMQ能有效地利用JDBC进行消息持久化。

    WebSocket协议接收ActiveMQ

    同时,为了优化性能,可能还需要考虑连接池和负载均衡策略。 总之,WebSocket与ActiveMQ的结合提供了高效、实时的消息传递能力,是构建现代Web应用程序和微服务架构的重要工具。通过理解和掌握WebSocket协议以及...

    Spring Boot整合ActiveMQ

    Spring Boot 整合 ActiveMQ 的过程涉及到多个技术栈的集成,包括前端模板引擎Thymeleaf、数据库连接池Druid、事务管理以及消息队列的使用。以下将详细阐述这些知识点。 1. **Spring Boot**: Spring Boot 是由 ...

    activemq-5.15+mysqljdbc配置.zip

    在这里,Durid可能是被用来作为ActiveMQ连接MySQL的连接池。同时,MySQL的JDBC驱动程序JAR(如`mysql-connector-java.jar`)也是必需的,因为它允许ActiveMQ通过Java Database Connectivity (JDBC) API与MySQL数据库...

    activemq-pool-5.4.2.jar.zip

    ActiveMQ Pool 提供了一种管理 JMS 连接的机制,通过连接池技术优化了资源的使用和管理。在高并发场景下,连接池能够避免频繁创建和销毁连接,从而显著提升系统性能。5.4.2 版本是该组件的一个稳定发行版,包含了...

    activemq activeMq笔记

    需要注意的是,使用消息连接池需要引入额外的依赖 `activemq-pool.jar`。 #### 存储数据库配置 ActiveMQ 支持多种持久化方式,包括 KahaDB 和 LevelDB 等。如果需要将消息存储到关系型数据库中,则需要进行相应的...

    编码实现MQ连接池实现JMS消息发送连接管理

    对于MQ,使用连接池能降低延迟,提高系统吞吐量,并且更有利于资源的管理和监控。 在给定的“MQPoolService”项目中,我们可以看到如何实现MQ连接池的具体步骤: 1. **选择JMS实现**:首先,你需要选择一个JMS提供...

    ActiveMQ问题解决记录

    配置文件可能包含了ActiveMQ的设置,如连接池大小、存储配置、网络拓扑等,这些都可能影响到ActiveMQ的性能和稳定性。如果包含自定义插件,那么问题可能与这些插件的兼容性或配置有关。 解决ActiveMQ问题通常涉及...

    activemq入门实例,有源代码

    8. **连接池与网络拓扑**:ActiveMQ支持连接池,优化资源利用。在网络拓扑方面,可以设置多个broker形成集群,提高可用性和负载均衡。 9. **消息选择器**:消费者可以使用消息选择器只接收满足特定条件的消息,这样...

Global site tag (gtag.js) - Google Analytics