`
longgangbai
  • 浏览: 7309330 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

关于ActiveMQ中Session和Connection资源的管理

阅读更多

      

配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
activemq所需的jar包
一段发送消息的代码:

执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:

点击队列名test,然后点击消息ID即可查看消息内容,如图:

如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:

执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:

这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。

 

       在ActiveMQ中Session和Connection是一种重要的资源,在数据库中,针对重要的资源如Connection我们采用数据库连接池,在ActiveMQ中有一个可选的组件为activmq-pool组件,用于处理关于Session和Connection,管理的方式采用池的原理即对象池。

      在activemq-pool中资源管理器类为ActiveMQResourceManager其中PooledConnectionFactory中定义Pool的管理的基本方法。其中在此类中定义默认的一些信息如下:

     private ConnectionFactory connectionFactory;
    private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
    private ObjectPoolFactory poolFactory;
    private int maximumActive = 500;
    private int maxConnections = 1;
    private int idleTimeout = 30 * 1000;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private long expiryTimeout = 0l;
     /* Sets the maximum number of active sessions per connection
     */
    public void setMaximumActive(int maximumActive) {
        this.maximumActive = maximumActive;
    }

    /**
     * @return the maxConnections
     */
    public int getMaxConnections() {
        return maxConnections;
    }

    /**
     * @param maxConnections the maxConnections to set
     */
    public void setMaxConnections(int maxConnections) {
        this.maxConnections = maxConnections;
    }

    protected ObjectPoolFactory createPoolFactory() {
        return new GenericObjectPoolFactory(null, maximumActive);
    }

    public int getIdleTimeout() {
        return idleTimeout;
    }

    public void setIdleTimeout(int idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    /**
     * allow connections to expire, irrespective of load or idle time. This is useful with failover
     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
     * 
     * @param expiryTimeout non zero in milliseconds
     */
    public void setExpiryTimeout(long expiryTimeout) {
        this.expiryTimeout = expiryTimeout;   
    }
    
    public long getExpiryTimeout() {
        return expiryTimeout;
    }

 

  其中默认的Connecton连接数为1:

       默认的每一个Connecton的创建的Session数量为500个。

     默认是实现的PooledConnectionFactory类如下:

AmqJNDIPooledConnectionFactory

JcaPooledConnectionFactory

XaPooledConnectionFactory

PooledConnectionFactory

 其中关于Session的管理的资源池使用如下类:

public class SessionPool implements PoolableObjectFactory

 

获取Session是从ConnectonPool中获取代码如下:

    public Session createSession(boolean transacted, int ackMode) throws JMSException {
        SessionKey key = new SessionKey(transacted, ackMode);
        SessionPool pool = cache.get(key);
        if (pool == null) {
            pool = createSessionPool(key);
            cache.put(key, pool);
        }
        PooledSession session = pool.borrowSession();
        return session;
    }

 使用实例如下:

package easyway.app.activemq.demo.acknow;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ActiveMQ 中连接池的使用
 * 
 * @author longgangbai
 *
 */
public class ActiveMQConnectionPool  {
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionPool.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory factory;
    private PooledConnectionFactory pooledFactory;

    public void testEviction() throws Exception {
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.addConnector("tcp://localhost:61619");
        broker.start();
        factory = new ActiveMQConnectionFactory("tcp://localhost:61619?closeAsync=false");
        pooledFactory = new PooledConnectionFactory(factory);
        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
        ActiveMQConnection amqC = connection.getConnection();
        final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
        amqC.addTransportListener(new TransportListener() {
            public void onCommand(Object command) {
            }
            public void onException(IOException error) {
                // we know connection is dead...
                // listeners are fired async
                gotExceptionEvent.countDown();
            }
            public void transportInterupted() {
            }
            public void transportResumed() {
            }
        });
        
        sendMessage(connection);
        Connection connection2 = pooledFactory.createConnection();
        sendMessage(connection2);
    }


   /**
    * 发送消息的方法
    * @param connection
    * @throws JMSException
    */
    private void sendMessage(Connection connection) throws JMSException {
    	//获取会话信息
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO"));
        producer.send(session.createTextMessage("Test"));
        session.close();
    }
    
    
	public static void main(String[] args) throws Exception {
		ActiveMQConnectionPool test=new ActiveMQConnectionPool();
		test.testEviction();
	}


}

 

 

在ActiveMQResourceManage的配置如下:

/**
 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
 * in a way that will allow the transaction manager to correctly recover XA transactions.
 *
 * For example, it can be used the following way:
 * <pre>
 *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 *      <property name="brokerURL" value="tcp://localhost:61616" />
 *   </bean>
 *
 *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
 *       <property name="maxConnections" value="8" />
 *       <property name="transactionManager" ref="transactionManager" />
 *       <property name="connectionFactory" ref="activemqConnectionFactory" />
 *       <property name="resourceName" value="activemq.broker" />
 *   </bean>
 *
 *   <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
 *         <property name="transactionManager" ref="transactionManager" />
 *         <property name="connectionFactory" ref="activemqConnectionFactory" />
 *         <property name="resourceName" value="activemq.broker" />
 *   </bean>
 * </pre>
 */

 

分享到:
评论
1 楼 鸿志永不止步 2017-03-09  
受益了,多谢!

相关推荐

    ActiveMQ相关jar包--使用Connection连接池

    在这个场景中,我们将关注如何利用ActiveMQ的连接池功能,以优化资源管理和提高性能。 首先,理解`Connection`在ActiveMQ中的角色是至关重要的。一个`Connection`代表到ActiveMQ服务器的物理连接,它可以创建多个`...

    activeMQ推送服务端和客户端完整案例

    在这个案例中,你将学习如何创建和运行一个简单的ActiveMQ服务端和客户端应用,理解消息的发送和接收过程,以及如何利用ActiveMQ提供的工具进行监控和管理。通过深入学习和实践,你可以更好地掌握ActiveMQ在实际项目...

    ActiveMQ客户端

    8. **管理工具**:ActiveMQ提供了一个Web控制台(webconsole),可以通过浏览器访问,进行管理操作,如查看和管理消息、监控性能等。 9. **安全性**:ActiveMQ支持基于角色的访问控制(RBAC)、SSL加密以及JAAS认证...

    ActiveMQ中Topic持久化Demo

    在分布式系统中,消息队列(Message Queue)作为解耦组件和异步处理的重要工具,Apache ActiveMQ 是一款广泛使用的开源消息中间件。本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在...

    activeMQ简单入门案例

    - 默认情况下,ActiveMQ的Web管理控制台监听在localhost的8161端口,可通过浏览器访问`http://localhost:8161/admin`进行监控和管理。 2. **创建生产者** - 创建一个Java项目,引入ActiveMQ的JMS客户端库,例如...

    activemq 入门示例代码

    ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统中的异步通信和解耦。在本文中,...

    ActiveMQ基于Java和JavaScript的应用实例

    在Java和JavaScript环境中,ActiveMQ能够作为一个桥梁,允许不同应用程序之间进行异步通信。下面将详细探讨如何使用Java和JavaScript与ActiveMQ交互,并实现应用实例。 首先,我们要理解ActiveMQ的基本概念。消息...

    activemq在maven工程

    在Maven工程中集成ActiveMQ,可以方便地管理和使用消息队列,实现应用间的异步通信。本文将详细介绍如何在Maven项目中配置和使用ActiveMQ。 1. **Maven依赖添加** 在`pom.xml`文件中,我们需要添加ActiveMQ的相关...

    基于Maven的ActiveMQ的简单实例

    总结,基于Maven的ActiveMQ简单实例展示了如何利用Maven管理依赖,以及如何使用Java API与ActiveMQ交互进行消息发送和接收。这个实例可以帮助初学者快速理解和入门ActiveMQ的基本用法,为进一步探索分布式系统中的...

    ActiveMQ实现例子

    启动后,ActiveMQ会提供一个管理界面,可以通过Web浏览器访问,以监控和管理消息队列。 在Java项目中集成ActiveMQ,你需要添加Apache ActiveMQ的JAR依赖。如果你使用的是Maven,可以在pom.xml文件中添加对应的依赖...

    activeMQ点对点map消息

    本篇文章将深入探讨在ActiveMQ中实现JMS的点对点(Point-to-Point)消息模型,并讨论如何在不同的项目中设置发送者和接收者。 首先,让我们理解点对点消息模型的基本概念。在点对点模型中,消息从一个生产者(发送...

    activeMQ 服务端客户端 java代码

    在 Java 开发环境中,ActiveMQ 提供了丰富的 API 和工具,使得开发者能够方便地在服务端和客户端之间发送和接收消息。 ### 1. ActiveMQ 服务端配置与启动 首先,我们需要在服务器端安装并配置 ActiveMQ。通常,...

    JMS事例,ActiveMQ服务器

    - **会话(Session)**:用于创建消息生产者、消费者和管理事务。 - **消息生产者(MessageProducer)**:发送消息到队列或主题。 - **消息消费者(MessageConsumer)**:接收消息。 - **消息(Message)**:...

    使用ActiveMQ示例.pdf

    6. 每隔一秒钟发送一个消息,总共发送三次,然后提交 Session 事务,关闭 Session 和 Connection。 接收代码: 1. 同样创建 ConnectionFactory 和 Connection。 2. 创建 Session,这里使用 AUTO_ACKNOWLEDGE 模式,...

    activeMQ生产者和消费者代码

    这些代码可能包括配置ActiveMQ服务器连接的细节、创建和管理ConnectionFactory、Connection、Session、Producer和Consumer的方法,以及发送和接收消息的逻辑。通过分析这些代码,你可以深入理解ActiveMQ在实际应用中...

    JMS之ActiveMQ工具类+使用例子.zip

    6. **丰富的管理工具**:ActiveMQ提供了Web控制台和命令行工具,方便管理和监控消息队列。 **ActiveMQ工具类的使用** 在Java中,我们可以使用ActiveMQ提供的API来创建和操作消息代理。例如,我们可以创建一个`...

    ActiveMQ简单Demo案例

    通过这个简单的Demo,你可以进一步学习如何配置ActiveMQ服务器,管理队列和主题,以及处理更复杂的消息传递场景,如事务性消息、持久化消息、消息优先级等。 总的来说,ActiveMQ作为一个强大的消息中间件,可以帮助...

    ActiveMQ测试Demo

    在实验三“消息中间件应用开发”中,我们可以基于上述步骤创建一个简单的ActiveMQ测试环境,通过编写生产者和消费者程序,理解消息中间件的工作原理以及ActiveMQ的基本用法。在实际项目中,这些基础将有助于构建更...

    C#客户端开发ActiveMq请下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包

    为了在C#项目中使用ActiveMQ,开发者需要依赖Apache.NMS和Apache.NMS.ActiveMQ这两个库。这两个bin包是专门为.NET平台设计的,允许.NET开发者无缝地集成ActiveMQ的功能。 Apache.NMS(Net Messaging System)是...

    ActiveMQ中文手册

    ActiveMQ中文手册是一个关于ActiveMQ的详细手册,旨在帮助开发者更好地理解和使用ActiveMQ。下面是从手册中提炼出的相关知识点: 1. JMS基本构件 * 连接工厂:客户用来创建连接的对象,例如ActiveMQ提供的...

Global site tag (gtag.js) - Google Analytics