`

activeMQ初体验

 
阅读更多

1 首先到官网下载activeMq       http://activemq.apache.org/

   下载压缩包,由于是做实验,暂时用简单的windows bin zip包

   解压到根目录,在bin下,执行命令,可以用cmd或者双击点击activemq

   访问 http://localhost:8161/   用户名密码默认是admin  /admin 

   这样代表消息服务器启动好了,当然暂时用的是内置服务器jetty

2 下面做个实验

    利用IDE工具或者其他工具建2个类,一个是生产者类,一个是消费者类,内部有写注释

  

package activeMq;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * jms 发送代码 生产者
 * @author zxg
 * @version $Id: Provider.java, v 0.1 2015年6月11日 下午4:42:33 zxg Exp $
 */
public class Provider {
  public static void main(String[] args) throws Exception{
          ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();  
        
          Connection connection =connectionFactory.createConnection();  
          connection.start();
          /** 1、AUTO_ACKNOWLEDGE 是自动确认模式,不需客户端进行确认
           *  2、CLIENT_ACKNOWLEDGE   客户端进行确认
              3、DUPS_OK_ACKNOWLEDGE  允许重复消息,
          */
          Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
          Destination destination = session.createQueue("my-queue");  
          MessageProducer producer = session.createProducer(destination);  
          
          for(int i=0; i<3; i++) {  
              MapMessage message = session.createMapMessage();  
              message.setLong("count", new Date().getTime());  
              Thread.sleep(1000);  
              //通过消息生产者发出消息  
              producer.send(message);  
          }  
          session.commit();  
          session.close();  
          connection.close();  
}
}

  下面是 消费者类

 

package activeMq;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消费者
 * @author Zxg
 * @version $Id: Consumer.java, v 0.1 2015年6月11日 下午5:03:44 zxg Exp $
 */
public class Consumer {
  public static void main(String[] args) throws Exception{
      ConnectionFactory  connectionFactory = new ActiveMQConnectionFactory();
      Connection connection = connectionFactory.createConnection();
      connection.start();
      final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      Destination  destination = session.createQueue("my-queue");
      MessageConsumer consumer = session.createConsumer(destination);
      int i = 0 ;
      while(i<3){
          i++;
          MapMessage message = (MapMessage)consumer.receive();
          session.commit();
          System.out.println("收到消息" + new Date(message.getLong("count")));
      }
     session.close();
     connection.close();
}
}

   3 首先执行生产者的main函数,结果可以这么看到

 

17:11:19.743 [main] DEBUG o.a.a.t.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
17:11:19.871 [main] DEBUG o.a.a.t.failover.FailoverTransport - Started unconnected
17:11:19.872 [main] DEBUG o.a.a.t.failover.FailoverTransport - Waking up reconnect task
17:11:19.875 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - urlList connectionList:[tcp://localhost:61616], from: [tcp://localhost:61616]
17:11:19.907 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Attempting  0th  connect to: tcp://localhost:61616
17:11:19.910 [ActiveMQ Task-1] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
17:11:19.913 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Connection established
17:11:19.913 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
17:11:19.913 [ActiveMQ Task-1] INFO  o.a.a.t.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=104857600}
17:11:19.914 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=9, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
17:11:20.951 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-60887-1434013879767-1:1:1
17:11:22.952 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-60887-1434013879767-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-60887-1434013879767-1:1:1
17:11:22.952 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-60887-1434013879767-1:1:1 syncCount: 0
17:11:22.960 [main] DEBUG o.a.a.t.failover.FailoverTransport - Stopped tcp://localhost:61616
17:11:22.961 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616

  在看下消息服务器的结果



 

  意思就是有3个入列,因为生产者就生产了三个消息。

然后我们执行下消费者的main函数,结果如下

18:12:47.827 [main] DEBUG o.a.a.t.failover.FailoverTransport - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
18:12:47.955 [main] DEBUG o.a.a.t.failover.FailoverTransport - Started unconnected
18:12:47.955 [main] DEBUG o.a.a.t.failover.FailoverTransport - Waking up reconnect task
18:12:47.956 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - urlList connectionList:[tcp://localhost:61616], from: [tcp://localhost:61616]
18:12:47.992 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Attempting  0th  connect to: tcp://localhost:61616
18:12:47.996 [ActiveMQ Task-1] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
18:12:47.997 [ActiveMQ Task-1] DEBUG o.a.a.t.failover.FailoverTransport - Connection established
18:12:47.997 [ActiveMQ Task-1] INFO  o.a.a.t.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
18:12:47.997 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=9, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={MaxFrameSize=104857600, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 before negotiation: OpenWireFormat{version=9, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=104857600}
18:12:47.998 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG o.a.a.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616 after negotiation: OpenWireFormat{version=9, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
18:12:48.037 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:1
18:12:48.038 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:1
18:12:48.038 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:1 syncCount: 1
收到消息Thu Jun 11 17:11:19 CST 2015
18:12:48.052 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:2
18:12:48.052 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:2
18:12:48.052 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:2 syncCount: 1
收到消息Thu Jun 11 17:11:20 CST 2015
18:12:48.055 [main] DEBUG o.apache.activemq.TransactionContext - Begin:TX:ID:ali-wb077950p-62342-1434017567852-1:1:3
18:12:48.055 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:ali-wb077950p-62342-1434017567852-1:1:1 Transaction Commit :TX:ID:ali-wb077950p-62342-1434017567852-1:1:3
18:12:48.055 [main] DEBUG o.apache.activemq.TransactionContext - Commit: TX:ID:ali-wb077950p-62342-1434017567852-1:1:3 syncCount: 1
收到消息Thu Jun 11 17:11:21 CST 2015
18:12:48.059 [main] DEBUG o.a.a.t.failover.FailoverTransport - Stopped tcp://localhost:61616
18:12:48.059 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616

 再看下消息服务器,刷新下队列




 好了,大概知道意思了,以后细说细节

 

分享到:
评论

相关推荐

    apache activeMQ之初体验(helloworld)

    在这个"apache activeMQ之初体验(helloworld)"中,我们将探索如何使用ActiveMQ进行基本的消息发布与订阅。 在消息中间件中,"Hello World"程序通常用于演示最基本的消息传递概念。ActiveMQ的"Hello World"程序主要...

    activeMQ收发工具.rar

    ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...

    ActiveMQ的activemq.xml详细配置讲解

    **ActiveMQ的activemq.xml配置详解** ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循Java消息服务(JMS)规范,提供可靠的消息传递功能。`activemq.xml`是ActiveMQ的核心配置文件,它定义了服务器的...

    ActiveMQ 配置文件详解

    **ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...

    ActiveMQ

    3. **异步处理**:对于耗时的操作,可以通过 ActiveMQ 将任务放入消息队列,后台线程异步处理,提升用户体验。 4. **分布式系统通信**:在微服务架构中,ActiveMQ 可作为服务间的通信桥梁,简化服务间的交互。 5. ...

    jdk activemq

    - **性能与兼容性**:使用早期版本的JDK可能会遇到性能瓶颈或安全问题,因此建议使用更高版本的JDK以获得更好的体验。 #### 2. JDK 1.6.x - **ActiveMQ 5.5.0 至 5.9.0**:从5.5.0版本开始,ActiveMQ开始支持JDK ...

    activemq

    ### ActiveMQ-CPP 开发手册知识点详述 #### 一、引言 - **编写目的**:本手册旨在帮助开发者快速掌握 CMS (C++ Messaging Service) 的使用方法,提高 C++ 开发者在消息传递系统方面的开发效率,并作为 CMS 开发的...

    ActiveMQ路由配置方式

    ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...

    jmx监控activeMQ监控

    jmx监控ActiveMQ监控 jmx(Java Management Extensions)是一种Java技术,为Java应用程序提供了管理和监控的功能。ActiveMQ是Apache软件基金会下的一个开源消息队列系统,提供了高效、可靠的消息传递服务。在生产...

    ActiveMQ客户端

    Apache ActiveMQ是开源的、基于Java消息服务(JMS)的Message Broker,它允许应用程序通过消息传递进行异步通信。ActiveMQ客户端库是用于与ActiveMQ服务器交互的接口,允许开发者在他们的应用中发送和接收消息。这个...

    Jmeter测试ActiveMQ性能报告

    本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...

    apache-activemq-5.16.5

    Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java消息服务(JMS)标准,提供了高度可扩展、可靠的异步通信能力。标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本...

    apache-activemq-5.17.3

    Apache ActiveMQ是开源的、基于Java消息服务(JMS)的应用服务器,它是Apache软件基金会的一部分。这个名为"apache-activemq-5.17.3"的压缩包包含了ActiveMQ的5.17.3版本,这是一个稳定且功能丰富的发布版本。在深入...

    apache-activemq Linux版本

    Apache ActiveMQ是业界广泛使用的开源消息中间件,尤其在Linux环境下表现出色。它基于Java语言开发,遵循Apache软件基金会的许可证,并且实现了多种消息传递协议,包括OpenWire、STOMP、AMQP和XMPP等。在Linux系统上...

    最新稳定版ActiveMQ5.15.0

    Apache ActiveMQ是开源社区中最流行的Java消息代理,也是企业级消息中间件(Message Broker)的首选之一。在最新的稳定版5.15.0中,它提供了可靠的消息传递功能,适用于分布式应用程序之间的通信,实现了异步处理、...

    ActiveMQ消息服务器 v6.0.1.zip

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本文中,我们将深入探讨ActiveMQ v6.0.1的核心特性、应用...

    Linux下activeMQ的启动和停止.docx

    在Linux环境下,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,能够处理大量的并发消息传递。ActiveMQ提供了高可用性、可扩展性和稳定性,使得它成为分布式...

    ActiveMQ高并发处理方案

    ### ActiveMQ高并发处理方案详解 #### 一、引言 在现代分布式系统中,消息队列作为异步通信的核心组件之一,对于提高系统的吞吐量、降低响应时间和实现服务解耦等方面起着至关重要的作用。Apache ActiveMQ作为一款...

    ActiveMQ开发规范及方案

    ActiveMQ开发规范及方案 ActiveMQ是一种流行的开源消息队列 middleware,广泛应用于分布式系统中。作为一种消息队列 middleware,ActiveMQ提供了许多功能,例如支持多种消息协议、事务支持、持久化机制等。为了确保...

Global site tag (gtag.js) - Google Analytics