配置完了持久化之后,我们就可以使用代码来发送和接收ActiveMQ中的消息了,我这里配置的持久化是KahaDB。
需要导入的jar包:
一段发送消息的代码:
执行了上面的发送方法之后,在ActiveMQ的监视控制可以看到有一个test队列,并且有一条消息,如图:
点击队列名test,然后点击消息ID即可查看消息内容,如图:
如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失。
一段接收消息的代码:
执行了上面的接收方法之后,在ActiveMQ的监视控制可以看到test队列的消息已经被消费了,如图:
这里的代码只是测试用,在正式开发中一般与Spring结合使用jmsTemplate来发送消息,现实JMS的MessageListener来监听消息。
在ActiveMQ中Session和Connection是一种重要的资源,在数据库中,针对重要的资源如Connection我们采用数据库连接池,在ActiveMQ中有一个可选的组件为activmq-pool组件,用于处理关于Session和Connection,管理的方式采用池的原理即对象池。
在activemq-pool中资源管理器类为ActiveMQResourceManager其中PooledConnectionFactory中定义Pool的管理的基本方法。其中在此类中定义默认的一些信息如下:
private ConnectionFactory connectionFactory; private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>(); private ObjectPoolFactory poolFactory; private int maximumActive = 500; private int maxConnections = 1; private int idleTimeout = 30 * 1000; private AtomicBoolean stopped = new AtomicBoolean(false); private long expiryTimeout = 0l; /* Sets the maximum number of active sessions per connection */ public void setMaximumActive(int maximumActive) { this.maximumActive = maximumActive; } /** * @return the maxConnections */ public int getMaxConnections() { return maxConnections; } /** * @param maxConnections the maxConnections to set */ public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; } protected ObjectPoolFactory createPoolFactory() { return new GenericObjectPoolFactory(null, maximumActive); } public int getIdleTimeout() { return idleTimeout; } public void setIdleTimeout(int idleTimeout) { this.idleTimeout = idleTimeout; } /** * allow connections to expire, irrespective of load or idle time. This is useful with failover * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery * * @param expiryTimeout non zero in milliseconds */ public void setExpiryTimeout(long expiryTimeout) { this.expiryTimeout = expiryTimeout; } public long getExpiryTimeout() { return expiryTimeout; }
其中默认的Connecton连接数为1:
默认的每一个Connecton的创建的Session数量为500个。
默认是实现的PooledConnectionFactory类如下:
AmqJNDIPooledConnectionFactory
JcaPooledConnectionFactory
XaPooledConnectionFactory
PooledConnectionFactory
其中关于Session的管理的资源池使用如下类:
public class SessionPool implements PoolableObjectFactory
获取Session是从ConnectonPool中获取代码如下:
public Session createSession(boolean transacted, int ackMode) throws JMSException { SessionKey key = new SessionKey(transacted, ackMode); SessionPool pool = cache.get(key); if (pool == null) { pool = createSessionPool(key); cache.put(key, pool); } PooledSession session = pool.borrowSession(); return session; }
使用实例如下:
package easyway.app.activemq.demo.acknow; import java.io.IOException; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.activemq.transport.TransportListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * ActiveMQ 中连接池的使用 * * @author longgangbai * */ public class ActiveMQConnectionPool { private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionPool.class); private BrokerService broker; private ActiveMQConnectionFactory factory; private PooledConnectionFactory pooledFactory; public void testEviction() throws Exception { broker = new BrokerService(); broker.setPersistent(false); broker.addConnector("tcp://localhost:61619"); broker.start(); factory = new ActiveMQConnectionFactory("tcp://localhost:61619?closeAsync=false"); pooledFactory = new PooledConnectionFactory(factory); PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); ActiveMQConnection amqC = connection.getConnection(); final CountDownLatch gotExceptionEvent = new CountDownLatch(1); amqC.addTransportListener(new TransportListener() { public void onCommand(Object command) { } public void onException(IOException error) { // we know connection is dead... // listeners are fired async gotExceptionEvent.countDown(); } public void transportInterupted() { } public void transportResumed() { } }); sendMessage(connection); Connection connection2 = pooledFactory.createConnection(); sendMessage(connection2); } /** * 发送消息的方法 * @param connection * @throws JMSException */ private void sendMessage(Connection connection) throws JMSException { //获取会话信息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO")); producer.send(session.createTextMessage("Test")); session.close(); } public static void main(String[] args) throws Exception { ActiveMQConnectionPool test=new ActiveMQConnectionPool(); test.testEviction(); } }
在ActiveMQResourceManage的配置如下:
/** * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager * in a way that will allow the transaction manager to correctly recover XA transactions. * * For example, it can be used the following way: * <pre> * <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> * <property name="brokerURL" value="tcp://localhost:61616" /> * </bean> * * <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean"> * <property name="maxConnections" value="8" /> * <property name="transactionManager" ref="transactionManager" /> * <property name="connectionFactory" ref="activemqConnectionFactory" /> * <property name="resourceName" value="activemq.broker" /> * </bean> * * <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> * <property name="transactionManager" ref="transactionManager" /> * <property name="connectionFactory" ref="activemqConnectionFactory" /> * <property name="resourceName" value="activemq.broker" /> * </bean> * </pre> */
相关推荐
在这个场景中,我们将关注如何利用ActiveMQ的连接池功能,以优化资源管理和提高性能。 首先,理解`Connection`在ActiveMQ中的角色是至关重要的。一个`Connection`代表到ActiveMQ服务器的物理连接,它可以创建多个`...
在这个案例中,你将学习如何创建和运行一个简单的ActiveMQ服务端和客户端应用,理解消息的发送和接收过程,以及如何利用ActiveMQ提供的工具进行监控和管理。通过深入学习和实践,你可以更好地掌握ActiveMQ在实际项目...
8. **管理工具**:ActiveMQ提供了一个Web控制台(webconsole),可以通过浏览器访问,进行管理操作,如查看和管理消息、监控性能等。 9. **安全性**:ActiveMQ支持基于角色的访问控制(RBAC)、SSL加密以及JAAS认证...
在分布式系统中,消息队列(Message Queue)作为解耦组件和异步处理的重要工具,Apache ActiveMQ 是一款广泛使用的开源消息中间件。本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在...
- 默认情况下,ActiveMQ的Web管理控制台监听在localhost的8161端口,可通过浏览器访问`http://localhost:8161/admin`进行监控和管理。 2. **创建生产者** - 创建一个Java项目,引入ActiveMQ的JMS客户端库,例如...
ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统中的异步通信和解耦。在本文中,...
在Java和JavaScript环境中,ActiveMQ能够作为一个桥梁,允许不同应用程序之间进行异步通信。下面将详细探讨如何使用Java和JavaScript与ActiveMQ交互,并实现应用实例。 首先,我们要理解ActiveMQ的基本概念。消息...
在Maven工程中集成ActiveMQ,可以方便地管理和使用消息队列,实现应用间的异步通信。本文将详细介绍如何在Maven项目中配置和使用ActiveMQ。 1. **Maven依赖添加** 在`pom.xml`文件中,我们需要添加ActiveMQ的相关...
总结,基于Maven的ActiveMQ简单实例展示了如何利用Maven管理依赖,以及如何使用Java API与ActiveMQ交互进行消息发送和接收。这个实例可以帮助初学者快速理解和入门ActiveMQ的基本用法,为进一步探索分布式系统中的...
启动后,ActiveMQ会提供一个管理界面,可以通过Web浏览器访问,以监控和管理消息队列。 在Java项目中集成ActiveMQ,你需要添加Apache ActiveMQ的JAR依赖。如果你使用的是Maven,可以在pom.xml文件中添加对应的依赖...
本篇文章将深入探讨在ActiveMQ中实现JMS的点对点(Point-to-Point)消息模型,并讨论如何在不同的项目中设置发送者和接收者。 首先,让我们理解点对点消息模型的基本概念。在点对点模型中,消息从一个生产者(发送...
在 Java 开发环境中,ActiveMQ 提供了丰富的 API 和工具,使得开发者能够方便地在服务端和客户端之间发送和接收消息。 ### 1. ActiveMQ 服务端配置与启动 首先,我们需要在服务器端安装并配置 ActiveMQ。通常,...
- **会话(Session)**:用于创建消息生产者、消费者和管理事务。 - **消息生产者(MessageProducer)**:发送消息到队列或主题。 - **消息消费者(MessageConsumer)**:接收消息。 - **消息(Message)**:...
6. 每隔一秒钟发送一个消息,总共发送三次,然后提交 Session 事务,关闭 Session 和 Connection。 接收代码: 1. 同样创建 ConnectionFactory 和 Connection。 2. 创建 Session,这里使用 AUTO_ACKNOWLEDGE 模式,...
这些代码可能包括配置ActiveMQ服务器连接的细节、创建和管理ConnectionFactory、Connection、Session、Producer和Consumer的方法,以及发送和接收消息的逻辑。通过分析这些代码,你可以深入理解ActiveMQ在实际应用中...
6. **丰富的管理工具**:ActiveMQ提供了Web控制台和命令行工具,方便管理和监控消息队列。 **ActiveMQ工具类的使用** 在Java中,我们可以使用ActiveMQ提供的API来创建和操作消息代理。例如,我们可以创建一个`...
在实验三“消息中间件应用开发”中,我们可以基于上述步骤创建一个简单的ActiveMQ测试环境,通过编写生产者和消费者程序,理解消息中间件的工作原理以及ActiveMQ的基本用法。在实际项目中,这些基础将有助于构建更...
ActiveMQ中文手册是一个关于ActiveMQ的详细手册,旨在帮助开发者更好地理解和使用ActiveMQ。下面是从手册中提炼出的相关知识点: 1. JMS基本构件 * 连接工厂:客户用来创建连接的对象,例如ActiveMQ提供的...
通过这个例子,开发者可以了解到如何在Web应用程序中使用ActiveMQ JMS进行消息通信,包括创建连接工厂、建立连接、创建会话、定义目的地、发送和接收消息等核心操作。这个例子对理解和实践ActiveMQ在Web环境中的应用...
为了在C#项目中使用ActiveMQ,开发者需要依赖Apache.NMS和Apache.NMS.ActiveMQ这两个库。这两个bin包是专门为.NET平台设计的,允许.NET开发者无缝地集成ActiveMQ的功能。 Apache.NMS(Net Messaging System)是...