`

ActiveMQ入门示例

 
阅读更多
企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。

在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里示例下使用 ActiveMQ

用 ActiveMQ 最好还是了解下 JMS

JMS 公共  点对点域  发布/订阅域 
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。

ConnectionFactory 是连接工厂,负责创建Connection。

Connection 负责创建 Session。

Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。

Destination 是消息的目的地。

详细的可以网上找些 JMS 规范(有中文版)。

下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解压,然后双击 bin/activemq.bat。运行后,可以在 http://localhost:8161/admin 观察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。

Jms 发送 代码:

public static void main(String[] args) throws Exception {   
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
  
    Connection connection = connectionFactory.createConnection();   
    connection.start();   
  
    Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
    Destination destination = session.createQueue("my-queue");   
  
    MessageProducer producer = session.createProducer(destination);   
    for(int i=0; i<3; i++) {   
        MapMessage message = session.createMapMessage();   
        message.setLong("count", new Date().getTime());   
        Thread.sleep(1000);   
        //通过消息生产者发出消息   
        producer.send(message);   
    }   
    session.commit();   
    session.close();   
    connection.close();   
}




Jms 接收代码:


public static void main(String[] args) throws Exception {   
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
  
    Connection connection = connectionFactory.createConnection();   
    connection.start();   
  
    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
    Destination destination = session.createQueue("my-queue");   
  
    MessageConsumer consumer = session.createConsumer(destination);   
    int i=0;   
    while(i<3) {   
        i++;   
        MapMessage message = (MapMessage) consumer.receive();   
        session.commit();   
  
        //TODO something....   
        System.out.println("收到消息:" + new Date(message.getLong("count")));   
    }   
  
    session.close();   
    connection.close();   
}




JMS五种消息的发送/接收的例子

转自:http://chenjumin.javaeye.com/blog/687124  

1、消息发送

//连接工厂  
ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
        ActiveMQConnection.DEFAULT_USER,  
        ActiveMQConnection.DEFAULT_PASSWORD,  
        "tcp://localhost:61616");  
 
//连接到JMS提供者  
Connection conn = connFactory.createConnection();  
conn.start();  
 
//事务性会话,自动确认消息  
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
 
//消息的目的地  
Destination destination = session.createQueue("queue.hello");  
 
//消息生产者  
MessageProducer producer = session.createProducer(destination);  
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化  
 
 
//文本消息  
TextMessage textMessage = session.createTextMessage("文本消息");  
producer.send(textMessage);  
 
//键值对消息  
MapMessage mapMessage = session.createMapMessage();  
mapMessage.setLong("age", new Long(32));  
mapMessage.setDouble("sarray", new Double(5867.15));  
mapMessage.setString("username", "键值对消息");  
producer.send(mapMessage);  
 
//流消息  
StreamMessage streamMessage = session.createStreamMessage();  
streamMessage.writeString("streamMessage流消息");  
streamMessage.writeLong(55);  
producer.send(streamMessage);  
 
//字节消息  
String s = "BytesMessage字节消息";  
BytesMessage bytesMessage = session.createBytesMessage();  
bytesMessage.writeBytes(s.getBytes());  
producer.send(bytesMessage);  
 
//对象消息  
User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口  
ObjectMessage objectMessage = session.createObjectMessage();  
objectMessage.setObject(user);  
producer.send(objectMessage);  
 
 
session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地  
producer.close();  
session.close();  
conn.close(); 




2、消息接收:通过消息监听器的方式接收消息


public class Receiver implements MessageListener{  
    private boolean stop = false;  
      
    public void execute() throws Exception {  
        //连接工厂  
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD,  
                "tcp://localhost:61616");  
          
        //连接到JMS提供者  
        Connection conn = connFactory.createConnection();  
        conn.start();  
          
        //事务性会话,自动确认消息  
        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
          
        //消息的来源地  
        Destination destination = session.createQueue("queue.hello");  
          
        //消息消费者  
        MessageConsumer consumer = session.createConsumer(destination);  
        consumer.setMessageListener(this);  
          
        //等待接收消息  
        while(!stop){  
            Thread.sleep(5000);  
        }  
          
        session.commit();  
          
        consumer.close();  
        session.close();  
        conn.close();  
    }  
 
    public void onMessage(Message m) {  
        try{  
            if(m instanceof TextMessage){ //接收文本消息  
                TextMessage message = (TextMessage)m;  
                System.out.println(message.getText());  
            }else if(m instanceof MapMessage){ //接收键值对消息  
                MapMessage message = (MapMessage)m;  
                System.out.println(message.getLong("age"));  
                System.out.println(message.getDouble("sarray"));  
                System.out.println(message.getString("username"));  
            }else if(m instanceof StreamMessage){ //接收流消息  
                StreamMessage message = (StreamMessage)m;  
                System.out.println(message.readString());  
                System.out.println(message.readLong());  
            }else if(m instanceof BytesMessage){ //接收字节消息  
                byte[] b = new byte[1024];  
                int len = -1;  
                BytesMessage message = (BytesMessage)m;  
                while((len=message.readBytes(b))!=-1){  
                    System.out.println(new String(b, 0, len));  
                }  
            }else if(m instanceof ObjectMessage){ //接收对象消息  
                ObjectMessage message = (ObjectMessage)m;  
                User user = (User)message.getObject();  
                System.out.println(user.getUsername() + " _ " + user.getPassword());  
            }else{  
                System.out.println(m);  
            }  
              
            stop = true;  
        }catch(JMSException e){  
            stop = true;  
            e.printStackTrace();  
        }  
    }  
分享到:
评论

相关推荐

    SpringActiveMQ入门示例

    SpringActiveMQ入门示例是关于如何在Java环境中利用Spring框架与Apache ActiveMQ集成的一个实践教程。这个示例主要适用于开发者想要了解如何在Spring应用中使用消息队列进行异步通信和解耦。在这个项目中,开发环境...

    activemq 入门示例代码

    **ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...

    activeMQ入门到精通.txt

    根据提供的文件信息:“activeMQ入门到精通”,我们可以深入探讨ActiveMQ的相关知识点,包括其基本概念、安装配置步骤、核心功能特性以及应用场景等。 ### ActiveMQ简介 ActiveMQ是一款开源的消息中间件,它支持...

    activeMQ简单入门案例

    本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...

    JMS-ActiveMQ入门实例

    **JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...

    ActiveMQ入门及深入使用的例子

    在压缩包"ActiveMQ-5.1"中,可能包含了示例代码和配置文件,你可以根据这些资料动手实践,通过运行例子来加深对ActiveMQ的理解。这些例子涵盖了基本的发送和接收消息,以及一些高级特性,如消息选择器、事务管理等。...

    activeMQ消息中间件入门示例,行行注释

    在这个“activeMQ消息中间件入门示例”中,我们将探讨如何设置基本的生产者和消费者来实现消息传递。 首先,让我们了解什么是消息中间件。消息中间件是一种软件系统,它允许不同的应用之间通过消息进行通信,而不是...

    HETF-ActiveMQ入门手册.zip

    在“HETF-ActiveMQ入门手册.doc”中,读者可能会找到如何安装和配置ActiveMQ,创建和管理队列与主题,设置安全策略,以及如何在实际项目中集成和使用ActiveMQ的详细步骤和示例。此外,文档可能还会涵盖性能调优技巧...

    ActiveMQ入门

    ### ActiveMQ 入门知识点详解 #### 一、ActiveMQ 概述 **ActiveMQ** 是由 Apache 软件基金会开发的一款免费且开源的消息中间件。与重量级且需付费的 IBM MQ 相比,ActiveMQ 更适合初学者及预算有限的项目使用。 *...

    消息队列-activemq入门实例.zip

    《ActiveMQ入门实例详解》 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件技术,被广泛应用于系统解耦、异步处理以及负载均衡等场景。Apache ActiveMQ是Apache软件基金会开发的一款开源消息代理,...

    Apache ActiveMQ 入门最简单例子

    在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...

    memcached和activeMQ的JAVA示例代码

    描述中的"memcached 和 activeMQ 的入门级示例代码,JAVA eclipse工程"告诉我们这个项目是为初学者设计的,它包含了在Eclipse开发环境中运行的Java代码。Eclipse是一款广泛使用的Java集成开发环境(IDE),使得...

    ActiveMQ 入门实战(3)--SpringBoot 整合 ActiveMQ(csdn)————程序.pdf

    以下是一个简单的示例: ```java package com.abc.demo.activemq.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org....

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    消息队列中间件ActiveMQ入门到精通视频教程及资料

    001-ActiveMQ基础;002-安全机制+签收模式+发送模式+MessageProducer;003-顺序消费+消息过滤SELECTOR+MessageConsumer+MySql持久化;004-p2p模式+pulish-subscribe发布订阅模式+与spring集成;...示例;

    ActiveMQ的入门例子

    **ActiveMQ入门详解** ActiveMQ是Apache组织开发的一款开源的消息中间件,它是Java Message Service (JMS) 的实现,主要用于处理应用间的异步通信。在分布式系统中,ActiveMQ作为一个消息代理,允许应用程序通过...

Global site tag (gtag.js) - Google Analytics