1 首先到官网下载activeMq http://activemq.apache.org/
下载压缩包,由于是做实验,暂时用简单的windows bin zip包
解压到根目录,在bin下,执行命令,可以用cmd或者双击点击activemq
访问 http://localhost:8161/ 用户名密码默认是admin /admin
这样代表消息服务器启动好了,当然暂时用的是内置服务器jetty
2 下面做个实验
利用IDE工具或者其他工具建2个类,一个是生产者类,一个是消费者类,内部有写注释
package activeMq; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * jms 发送代码 生产者 * @author zxg * @version $Id: Provider.java, v 0.1 2015年6月11日 下午4:42:33 zxg Exp $ */ public class Provider { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection =connectionFactory.createConnection(); connection.start(); /** 1、AUTO_ACKNOWLEDGE 是自动确认模式,不需客户端进行确认 * 2、CLIENT_ACKNOWLEDGE 客户端进行确认 3、DUPS_OK_ACKNOWLEDGE 允许重复消息, */ Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for(int i=0; i<3; i++) { MapMessage message = session.createMapMessage(); message.setLong("count", new Date().getTime()); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); } }
下面是 消费者类
package activeMq; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者 * @author Zxg * @version $Id: Consumer.java, v 0.1 2015年6月11日 下午5:03:44 zxg Exp $ */ public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i = 0 ; while(i<3){ i++; MapMessage message = (MapMessage)consumer.receive(); session.commit(); System.out.println("收到消息" + new Date(message.getLong("count"))); } session.close(); connection.close(); } }
3 首先执行生产者的main函数,结果可以这么看到
17:11:19.743 [main] DEBUG o.a.a.t.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport. 17:11:19.871 [main] DEBUG o.a.a.t.failover.FailoverTransport - Started unconnected 17:11:19.872 [main] DEBUG o.a.a.t.failover.FailoverTransport - Waking up reconnect task 17:11:19.875 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - urlList connectionList:[tcp://localhost:61616], from: [tcp://localhost:61616] 17:11:19.907 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Attempting 0th connect to: tcp://localhost:61616 17:11:19.910 [ActiveMQ Task-1] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 17:11:19.913 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Connection established 17:11:19.913 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 17:11:19.913 [ActiveMQ Task-1] INFO o.a.a.t.failover.FailoverTransport - Successfully connected to tcp://localhost:61616 17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=104857600} 17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=9, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 17:11:20.951 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 17:11:22.952 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-60887-1434013879767-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 17:11:22.952 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 syncCount: 0 17:11:22.960 [main] DEBUG o.a.a.t.failover.FailoverTransport - Stopped tcp://localhost:61616 17:11:22.961 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616
在看下消息服务器的结果
意思就是有3个入列,因为生产者就生产了三个消息。
然后我们执行下消费者的main函数,结果如下
18:12:47.827 [main] DEBUG o.a.a.t.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport. 18:12:47.955 [main] DEBUG o.a.a.t.failover.FailoverTransport - Started unconnected 18:12:47.955 [main] DEBUG o.a.a.t.failover.FailoverTransport - Waking up reconnect task 18:12:47.956 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - urlList connectionList:[tcp://localhost:61616], from: [tcp://localhost:61616] 18:12:47.992 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Attempting 0th connect to: tcp://localhost:61616 18:12:47.996 [ActiveMQ Task-1] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 18:12:47.997 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Connection established 18:12:47.997 [ActiveMQ Task-1] INFO o.a.a.t.failover.FailoverTransport - Successfully connected to tcp://localhost:61616 18:12:47.997 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=104857600} 18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=9, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 18:12:48.037 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 18:12:48.038 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 18:12:48.038 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 syncCount: 1 收到消息Thu Jun 11 17:11:19 CST 2015 18:12:48.052 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 18:12:48.052 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 18:12:48.052 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 syncCount: 1 收到消息Thu Jun 11 17:11:20 CST 2015 18:12:48.055 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 18:12:48.055 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 18:12:48.055 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 syncCount: 1 收到消息Thu Jun 11 17:11:21 CST 2015 18:12:48.059 [main] DEBUG o.a.a.t.failover.FailoverTransport - Stopped tcp://localhost:61616 18:12:48.059 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616
再看下消息服务器,刷新下队列
好了,大概知道意思了,以后细说细节
相关推荐
在这个"apache activeMQ之初体验(helloworld)"中,我们将探索如何使用ActiveMQ进行基本的消息发布与订阅。 在消息中间件中,"Hello World"程序通常用于演示最基本的消息传递概念。ActiveMQ的"Hello World"程序主要...
ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...
**ActiveMQ的activemq.xml配置详解** ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循Java消息服务(JMS)规范,提供可靠的消息传递功能。`activemq.xml`是ActiveMQ的核心配置文件,它定义了服务器的...
**ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...
3. **异步处理**:对于耗时的操作,可以通过 ActiveMQ 将任务放入消息队列,后台线程异步处理,提升用户体验。 4. **分布式系统通信**:在微服务架构中,ActiveMQ 可作为服务间的通信桥梁,简化服务间的交互。 5. ...
- **性能与兼容性**:使用早期版本的JDK可能会遇到性能瓶颈或安全问题,因此建议使用更高版本的JDK以获得更好的体验。 #### 2. JDK 1.6.x - **ActiveMQ 5.5.0 至 5.9.0**:从5.5.0版本开始,ActiveMQ开始支持JDK ...
### ActiveMQ-CPP 开发手册知识点详述 #### 一、引言 - **编写目的**:本手册旨在帮助开发者快速掌握 CMS (C++ Messaging Service) 的使用方法,提高 C++ 开发者在消息传递系统方面的开发效率,并作为 CMS 开发的...
ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...
jmx监控ActiveMQ监控 jmx(Java Management Extensions)是一种Java技术,为Java应用程序提供了管理和监控的功能。ActiveMQ是Apache软件基金会下的一个开源消息队列系统,提供了高效、可靠的消息传递服务。在生产...
Apache ActiveMQ是开源的、基于Java消息服务(JMS)的Message Broker,它允许应用程序通过消息传递进行异步通信。ActiveMQ客户端库是用于与ActiveMQ服务器交互的接口,允许开发者在他们的应用中发送和接收消息。这个...
本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...
Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java消息服务(JMS)标准,提供了高度可扩展、可靠的异步通信能力。标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本...
Apache ActiveMQ是开源的、基于Java消息服务(JMS)的应用服务器,它是Apache软件基金会的一部分。这个名为"apache-activemq-5.17.3"的压缩包包含了ActiveMQ的5.17.3版本,这是一个稳定且功能丰富的发布版本。在深入...
Apache ActiveMQ是业界广泛使用的开源消息中间件,尤其在Linux环境下表现出色。它基于Java语言开发,遵循Apache软件基金会的许可证,并且实现了多种消息传递协议,包括OpenWire、STOMP、AMQP和XMPP等。在Linux系统上...
Apache ActiveMQ是开源社区中最流行的Java消息代理,也是企业级消息中间件(Message Broker)的首选之一。在最新的稳定版5.15.0中,它提供了可靠的消息传递功能,适用于分布式应用程序之间的通信,实现了异步处理、...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本文中,我们将深入探讨ActiveMQ v6.0.1的核心特性、应用...
在Linux环境下,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,能够处理大量的并发消息传递。ActiveMQ提供了高可用性、可扩展性和稳定性,使得它成为分布式...
### ActiveMQ高并发处理方案详解 #### 一、引言 在现代分布式系统中,消息队列作为异步通信的核心组件之一,对于提高系统的吞吐量、降低响应时间和实现服务解耦等方面起着至关重要的作用。Apache ActiveMQ作为一款...
ActiveMQ开发规范及方案 ActiveMQ是一种流行的开源消息队列 middleware,广泛应用于分布式系统中。作为一种消息队列 middleware,ActiveMQ提供了许多功能,例如支持多种消息协议、事务支持、持久化机制等。为了确保...