`

JMS学习笔记 (三)ActiveMQ简单例子

    博客分类:
  • JMS
 
阅读更多
我们使用的是ActiveMQ 5.10.1 Release的Windows版,开发时候,要将apache-activemq-5.10.0-bin.zip解压缩后里面的activemq-all-5.10.0.jar包加入到classpath下面,这个包包含了所有jms接口api的实现。

JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:

AUTO_ACKNOWLEDGE = 1    自动确认
CLIENT_ACKNOWLEDGE = 2    客户端手动确认  
DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
SESSION_TRANSACTED = 0    事务提交并确认

可以发送的消息类型有ObjectMessage、TextMessage、MapMessage

(一)点对点的消息模型,只需要一个消息生成者和消息消费者,下面是代码。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.ObjectMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息的生产者(发送者) 
 * 
 *
 */
public class JMSProducer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
           
            
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    /**
     * 发送消息
     * @param session
     * @param messageProducer  消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            //创建一条文本消息 
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息 
            messageProducer.send(message);
        }

    }
}


编写消费者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.ObjectMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息的消费者(接受者)
 * 
 *
 */
public class JMSConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名,默认为空
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码,默认为空
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址,tcp://localhost:61616

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true) {
            	
            	//设置接收者接收消息的时间,超过时间不接收将执行下一步,如不设置此参数是长时间处于阻塞状态不执行下一步操作等到有消息才执行下一步,为了便于测试,这里谁定为500s
                TextMessage textMessage = (TextMessage) messageConsumer.receive(50000);
                if(textMessage != null){
                    System.out.println("收到的消息:" + textMessage.getText());
                }else {
                	connection.close();
                    break;
                }
                
                
            	
            }
System.out.println("消息执收完成!");

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}


运行

首先,启动ActiveMQ,后在浏览器中输入:http://localhost:8161/admin/,然后开始执行:
运行发送者,myeclipse控制台输出,如下图:



此时,我们先看一下ActiveMQ服务器,Queues内容如下:




我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息,如下图:




(二)消息的发布订阅模式

编写消息发布对象

import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

public class Publisher {

	//默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
    
   //连接工厂
    ConnectionFactory connectionFactory;
    //连接
    Connection connection = null;
    //会话 接受或者发送消息的线程
    Session session;
   //消息生产者
    MessageProducer messageProducer;
    //消息的目的地
    Destination []destinations;
    
    public Publisher() throws JMSException {  
    	//实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(Publisher.USERNAME, Publisher.PASSWORD, Publisher.BROKEURL);
       //通过连接工厂获取连接
        connection = connectionFactory.createConnection();
        try {  
        connection.start();  
        } catch (JMSException jmse) {  
            connection.close();  
            throw jmse;  
        }  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ;  
        messageProducer = session.createProducer(null);  
    }  
    protected void setTopics(String[] topics) throws JMSException {
    	destinations = new Destination[topics.length];
    	for(int i = 0; i < topics.length; i++) {
    		destinations[i] = session.createTopic("TOPICS." + topics[i]);
    	}
    }
    
    protected Message createMessage(String msg,int age, Session session) throws JMSException {  
        MapMessage message = session.createMapMessage();  
        message.setString("name", msg);  
        message.setInt("age", age);      
        return message;  
    }  
    
    protected void sendMessage(String msg,int age) throws JMSException {  
        for(int i = 0; i < destinations.length; i++) {  
            Message message = createMessage(msg,age, session);  
            System.out.println("发送: " + ((ActiveMQMapMessage)message).getContentMap() + " 到目标: " + destinations[i]);  
            messageProducer.send(destinations[i], message);  
        }  
    }  
    
    public void close() throws JMSException {  
        if (connection != null) {  
            connection.close();  
         }  
        
    }  
    
    public static void main(String[] args) throws JMSException {
    	
            // 创建发布消息对象	
            Publisher publisher = new Publisher();
            
            // 设置订阅者
            String []strs=new String[]{"订阅者01","订阅者02","订阅者03","订阅者04"};
            
    	    publisher.setTopics(strs);
    	    
    	    Random generator = new Random();
            for(int i=0;i<2;i++){
            	//设置要发送的消息
    		publisher.sendMessage("美男"+i,generator.nextInt(90));
    		
    		try {
    			Thread.sleep(1000);//每发一条信息暂停1秒,这个可有可无,是为了显示效果有停顿的感觉
    		} catch(InterruptedException e) {
    			e.printStackTrace();
    		}
            }
            
        // 关闭链接
        publisher.close();
    }
}


编写消息订阅对象(接收者)

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer implements MessageListener{
	//默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
  //连接工厂
    ConnectionFactory connectionFactory;
    //连接
    Connection connection = null;
    //会话 接受或者发送消息的线程
    Session session;
    
    MessageConsumer messageConsumer;
    
    public Consumer(String topic) throws JMSException
    {
    	//实例化连接工厂
		  connectionFactory = new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.BROKEURL);
       //通过连接工厂获取连接
		 connection = connectionFactory.createConnection();
         connection.start();
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = session.createTopic("TOPICS." + topic);
		 messageConsumer = session.createConsumer(destination);
		 
    }

    
    public MessageConsumer getMessageConsumer()
    {
    	return messageConsumer;
    }
	
	public static void main(String[] args) throws JMSException {
		
	    String []topics=new String[]{"订阅者01","订阅者02","订阅者03","订阅者04"};
	    for (String topic : topics) {
	    	 Consumer consumer = new Consumer(topic);
	    	 MessageConsumer messageConsumer=consumer.getMessageConsumer();
	    	 messageConsumer.setMessageListener(consumer);
	    }
	}
		
	
	@Override
	public void onMessage(Message msg) {
		// TODO Auto-generated method stub
		try {  
            MapMessage map = (MapMessage)msg;  
            String name = map.getString("name");  
            int age = map.getInt("age");    
            System.out.println(name+" 年龄:"+age+"\n");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
	}
	
	
}



先运行订阅者对象,因为在发布/订阅的消息模型下,只有订阅者在启动或者侦听状态下才能发送成功。myeclispe如下图显示







此时,我们先看一下ActiveMQ服务器,Topic内容如下:




  • 大小: 25.7 KB
  • 大小: 204.6 KB
  • 大小: 53 KB
  • 大小: 14.7 KB
  • 大小: 13.4 KB
  • 大小: 21.5 KB
  • 大小: 37.2 KB
分享到:
评论

相关推荐

    jms+sping+activeMq的例子serd和recevice

    标题"jms+spring+activeMq的例子serd和recevice"提到了三个关键术语:JMS(Java Message Service)、Spring框架和ActiveMQ。这表明我们将探讨一个结合了这三个技术的示例,其中"serd"可能是"server"的误拼,而...

    JMS学习笔记(一)——JMS简介安装ActiveMQ

    **JMS学习笔记(一)——JMS简介与ActiveMQ安装** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的标准接口。它为应用程序提供了一种标准的方式,用来创建、发送、接收和读取...

    一个activeMQ的简单例子

    这个简单的ActiveMQ例子可能是为了演示如何设置和使用基本的生产者和消费者,以及如何通过消息队列实现异步通信。在实际应用中,我们还可以利用ActiveMQ的高级特性,如持久化、优先级、消息筛选等,以满足更复杂的...

    jms+activeMq+spring学习简单例子

    标题“jms+activeMq+spring学习简单例子”表明这个压缩包包含了一些示例代码,用于演示如何在Spring框架中集成JMS和ActiveMQ,以便于理解和学习。通过这个例子,开发者可以了解如何在实际应用中实现基于消息的通信。...

    JMS.rar_activemq_jms_jms activemq

    在这个例子中,我们连接到运行在本地的ActiveMQ服务器(默认端口61616),创建一个非事务性的会话,然后创建一个消息队列`myQueue`作为目的地。接着,我们创建一个`MessageProducer`来发送`TextMessage`,最后关闭...

    test_jms.zip_activemq_activemq案例_jms_jms test

    描述"jms简单的案例,用的activemq,使用jms前请先启动activemq服务器"表明这是一个初级的JMS实践,涉及到使用ActiveMQ作为服务器,而且在运行任何JMS相关的代码之前,需要确保ActiveMQ服务已经启动。 **ActiveMQ与...

    activeMQ JMS WEB 例子

    在这个"ActiveMQ JMS WEB 例子"中,我们将探讨如何在Web环境中使用ActiveMQ进行消息通信。 首先,了解ActiveMQ的基本概念是必要的。ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、XMPP和MQTT,使其能够广泛应用...

    JMS之Spring +activeMQ实现消息队列

    总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...

    JMS学习笔记精心总结

    **JMS学习笔记精心总结** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准接口。它允许应用程序创建、发送、接收和读取消息,使得应用程序能够在不直接连接的情况下...

    ActiveMQ 简单例子源码 包含操作说明

    通过这个简单的 ActiveMQ 示例,你可以了解如何利用 JMS 接口在 Java 应用程序中使用 ActiveMQ 进行消息通信。这个例子将帮助你快速上手,并理解消息队列在分布式系统中的作用和价值。记住,深入学习 ActiveMQ 的...

    ActiveMQ学习 完整例子

    下面将详细介绍ActiveMQ的学习内容,以及如何通过实际例子来理解和掌握它。 1. **ActiveMQ基本概念** - **消息中间件**:ActiveMQ作为一个消息中间件,负责在分布式系统中传递消息,解耦生产者和消费者。 - **JMS...

    JMS 使用 ActiveMQ 传送文件

    **标题:“JMS 使用 ActiveMQ 传送文件”** 在IT领域,Java消息服务(Java ...通过这些知识点的学习和实践,开发者可以掌握使用JMS和ActiveMQ进行文件传输的核心技能,为构建可靠的、分布式的应用打下坚实的基础。

    Apache ActiveMQ 入门最简单例子

    在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...

    activeMq的一个小例子

    activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子...

    springMVC+activeMQ例子

    Spring MVC是Spring框架的一部分,主要用于构建基于Java的Web应用程序,而ActiveMQ则是Apache出品的一个开源消息代理,它实现了多种消息协议,使得分布式系统中的异步通信变得简单高效。下面我们将深入探讨这两个...

    JMS-activemq 实例(分ppt,eclipse工程,说明三部分)

    在这个实例中,`JMS-activemq` 提供了三种不同的组成部分: 1. **PPTX文件(activemq.pptx)** - 这通常是一个演示文稿,详细介绍了JMS和ActiveMQ的基础知识、工作原理以及如何使用它们。它可能包含概念解释、架构...

    JMS_ActiveMQ交流学习

    ### JMS与ActiveMQ知识点详解 #### 一、JMS简介 **Java Message Service (JMS)** 是由Sun Microsystems提出的一项技术规范,旨在为多种消息中间件(Message-Oriented Middleware, MOM)提供统一的接口标准。JMS的...

    JMS之ActiveMQ工具类+使用例子.zip

    **Apache ActiveMQ与JMS简介** Apache ActiveMQ是基于Java消息服务(Java Message Service,简称JMS)的开源消息中间件,由Apache软件基金会开发。它允许应用程序通过发送和接收消息来实现异步通信,从而解耦了系统...

    JMS相关,教程,例子,学习笔记

    通过阅读**jms学习笔记.docx** 和 **JMS例子.docx**,你可以深入理解JMS的使用方式,获取实践经验。而 **JMS教程.pdf** 和 **基于XML和JMS的异构数据交换集成的研究.pdf** 则提供了理论基础和高级应用场景的讲解,有...

    JMS教程+activemq以及activemq和tomcat的整合

    ActiveMQ是Apache软件基金会开发的一款开源、高性能、跨语言的消息中间件,它实现了JMS规范。ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、XMPP和MQTT,使得不同平台和语言的应用能够轻松地集成。ActiveMQ还提供...

Global site tag (gtag.js) - Google Analytics