`

activemq学习(1)

阅读更多

 

一:jms介绍
         jms说白了就是java message service,是J2EE规范的一部分,跟jdbc差不多,sun只提供了接口,由各个厂商(provider)来进行具体的实现,然后使用者使用他们的jar包进行开发使用即可。
         另外在jms的API中,jms传递消息有两种方式,一种是点对点的Queue,还有一个是发布订阅的Topic方式。区别在于:
        对于Queue模式,一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A,B那就是A,B总共会收到10条消息,不重复。
         对于Topic模式,一个发布者发布消息,有两个接收者A,B来订阅,那么发布了10条消息,A,B各收到10条消息。
       关于api的简单基础可以看下:http://www.javaeye.com/topic/64707,简单的参考!
二:ActiveMQ介绍
         activeMQ是apache下的一个开源jms产品,具体参见apache官方网站;
         Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4. Apache ActiveMQ is released under the Apache 2.0 License
 三:开始实现代码
       1: 使用activeMQ来完成jms的发送,必须要下载activeMQ,然后再本机安装,并且启动activeMQ的服务才行。在官网下载完成之后,运行bin目录下面的activemq.bat,将activeMQ成功启动。
        启动成功之后可以运行:http://localhost:8161/admin/index.jsp  查看一下。
       2:发送端,sender
   
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
   
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
   
public class Sender {
    private static final int SEND_NUMBER = 5;
   
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
   
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("test-queue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化,可以更改
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息
            sendMessage(session, producer);
            session.commit();
   
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
   
    }
    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + i);
            producer.send(message);
        }
    }
}
   
         3:接收端,receive
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
   
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
   
public class Receiver {
    public static void main(String[] args) {
   
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
   
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            //test-queue跟sender的保持一致,一个创建一个来接收
            destination = session.createQueue("test-queue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message arg0) {
                    System.out.println("==================");
                    try {
                        System.out.println("RECEIVE1第一个获得者:"
                                + ((TextMessage) arg0).getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
   
                }
            });
   
            MessageConsumer consumer1 = session.createConsumer(destination);
            consumer1.setMessageListener(new MessageListener() {
                public void onMessage(Message arg0) {
                    System.out.println("+++++++++++++++++++");
                    try {
                        System.out.println("RECEIVE1第二个获得者:"
                                + ((TextMessage) arg0).getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
   
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        //在eclipse里运行的时候,这里不要关闭,这样就可以一直等待服务器发送了,不然就直接结束了。
        // } finally {
        // try {
        // if (null != connection)
        // connection.close();
        // } catch (Throwable ignore) {
        // }
        // }
   
    }
}
   
         4:发送端,sender
         上面的是用Queue的方式来创建,下面再用topic的方式实现同样的功能。
   
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
   
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
   
public class TopicTest {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616");
   
        Connection connection = factory.createConnection();
        connection.start();
   
        // 创建一个Topic
        Topic topic = new ActiveMQTopic("testTopic");
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
   
        // 注册消费者1
        MessageConsumer comsumer1 = session.createConsumer(topic);
        comsumer1.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer1 get "
                            + ((TextMessage) m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
   
        // 注册消费者2
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                try {
                    System.out.println("Consumer2 get "
                            + ((TextMessage) m).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
   
        });
   
        // 创建一个生产者,然后发送多个消息。
        MessageProducer producer = session.createProducer(topic);
        for (int i = 0; i < 10; i++) {
            System.out.println("producer begin produce=======");
            producer.send(session.createTextMessage("Message:" + i));
        }
    }
   
}

以上引自: http://www.dev26.com/bbs/topic/92

 

 

 

消息发布者
package com.googlecode.garbagecan.jmsstudy.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {
	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("myTopic.messages");

		MessageProducer producer = session.createProducer(topic);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

		while(true) {
			TextMessage message = session.createTextMessage();
			message.setText("message_" + System.currentTimeMillis());
			producer.send(message);
			System.out.println("Sent message: " + message.getText());

			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

//		session.close();
//		connection.stop();
//		connection.close();
	}
}



消息订阅者(消息消费者)
package com.googlecode.garbagecan.jmsstudy.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {
	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("myTopic.messages");

		MessageConsumer consumer = session.createConsumer(topic);
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				TextMessage tm = (TextMessage) message;
				try {
					System.out.println("Received message: " + tm.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
//		session.close();
//		connection.stop();
//		connection.close();
	}
}

 以上引自:http://www.open-open.com/lib/view/open1328079945062.html

 

此步骤主要是利用两个实例对jms及activemq有个大体了解,按照实例写了一个测试DEMO,关于queue和topic的两个。

 

分享到:
评论
1 楼 ssy341 2012-04-20  
下载来试试,学习学习~~~

相关推荐

    ActiveMQ学习 完整例子

    在"ActiveMQ学习"的完整例子中,你可以通过编写Java代码来创建生产者和消费者,实践发送和接收消息,了解消息的生命周期和状态。同时,通过配置不同的参数,体验ActiveMQ的灵活性和强大功能。例如,你可以创建一个...

    activeMQ学习

    1. **多协议支持**:除了支持JMS,ActiveMQ还支持STOMP、XMPP、AMQP和OpenWire等多种消息协议,使得不同平台和语言的应用可以轻松地进行通信。 2. **高性能与可扩展性**:ActiveMQ设计时考虑了性能和扩展性,通过...

    activeMQ学习资料

    这个压缩包文件包含了一系列与ActiveMQ学习相关的资源,包括案例代码、案例说明文档和技术总结文档,非常适合初学者和进阶者深入理解和实践ActiveMQ。 1. **ActiveMQ基础概念** - **消息队列**:ActiveMQ的核心是...

    ActiveMQ的学习

    **ActiveMQ 学习指南** ActiveMQ 是一款开源的消息中间件,它基于 Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。作为一个专业的 IT 从业者,理解并掌握 ActiveMQ ...

    activemq学习心得.docx

    activemq学习心得 activemq是Apache软件基金会所研发的开放源代码消息队列iddleware,主要用来实现异步消息处理、解耦合和扩展系统。以下是activemq学习心得的知识点总结: 一、activemq配置文件 activemq的配置...

    spring boot ActiveMQ学习练习demo项目源码

    Spring Boot ActiveMQ学习练习demo项目源码是一个针对Java开发者的学习资源,它涵盖了使用Spring Boot集成ActiveMQ进行消息队列操作的基本实践。ActiveMQ是Apache软件基金会的一个开源项目,它是Java消息服务(JMS)...

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    ActiveMQ学习笔记之四--启动嵌入式Broker(纯代码方式)

    在本篇ActiveMQ学习笔记中,我们将探讨如何通过纯代码方式启动一个嵌入式的Broker,这对于测试、开发或者快速原型构建非常有用。ActiveMQ是一个开源的消息代理,它遵循Java消息服务(JMS)规范,提供了高可靠性的...

    activemq学习资料.docx

    ActiveMQ学习资料 Activemq是Apache软件基金会的一个开源的消息总线系统,使用Java语言编写的,遵循JMS(Java Message Service)规范。下面是Activemq学习资料的知识点总结: JMS基本构件 * 连接工厂...

    ActiveMq.md

    activemq学习笔记 activemq学习笔记 activemq学习笔记 activemq学习笔记 activemq学习笔记 activemq学习笔记

    Java消息中间件ActiveMQ学习资料

    总的来说,这个学习资料包提供了从理论到实践的全方位ActiveMQ学习路径。从基础的ActiveMQ使用到高级的集群配置,再到生产环境中的实际应用,每个环节都覆盖到了。对Java开发者和系统架构师而言,这些都是提升技能和...

    工作学习-消息中间件activeMQ学习总结

    工作学习-消息中间件ActiveMQ学习总结 本文总结了消息中间件ActiveMQ的学习要点,涵盖了为什么使用消息中间件、消息中间件的组成、JMS规范、ActiveMQ简介、消息中间件的应用场景等方面。 一、为什么使用消息中间件...

    activemq学习入门第一步

    本教程将引导你迈入ActiveMQ学习之旅的第一步,帮助你理解其核心概念和基本操作。 1. **ActiveMQ简介** Apache ActiveMQ是一个完全支持JMS 1.1和JMS 2.0规范的消息代理,它允许应用程序通过发送和接收消息来解耦...

    ActiveMQ学习笔记之一--ActiveMQ下载

    **ActiveMQ学习笔记之一——ActiveMQ下载** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它是基于Java消息服务(JMS)规范的,用于在分布式系统中传递消息。作为一个高性能、可伸缩且可靠的解决方案,...

    ActiveMQ安装包及学习资料.相当经典

    在开始安装和学习ActiveMQ之前,了解其基本概念是非常重要的。消息中间件(如ActiveMQ)的主要任务是作为生产者和消费者之间的代理,通过存储和转发消息来解耦应用程序。JMS是Java平台的标准接口,用于在分布式环境...

    JMS_ActiveMQ交流学习

    ### JMS与ActiveMQ知识点详解 #### 一、JMS简介 **Java Message Service (JMS)** 是由Sun Microsystems提出的一项技术规范,旨在为多种消息中间件(Message-Oriented Middleware, MOM)提供统一的接口标准。JMS的...

    activeMQ收发工具.rar

    8. **事务处理**:学习如何在ActiveMQ中使用JMS事务确保消息的一致性和可靠性。 9. **性能监控**:ActiveMQ提供了一套强大的监控工具,包括Web控制台,可以用来查看消息的发送、接收和堆积情况,帮助优化系统性能。...

    activemq学习(2) spring+activemq

    标题 "activemq学习(2) spring+activemq" 暗示了这篇内容将探讨如何在Spring框架中集成ActiveMQ,一个流行的开源消息代理和消息中间件。ActiveMQ允许应用程序之间通过消息传递进行通信,而Spring则是一个广泛使用...

    ActiveMQ的activemq.xml详细配置讲解

    配合提供的文档,如《activeMQ in Action.doc》和《ActiveMQ测试报告.pdf》,可以更深入地学习ActiveMQ的工作原理和最佳实践。对于与数据库的集成,如`activemq数据库,验证持久化标准配置.txt`所示,ActiveMQ支持...

Global site tag (gtag.js) - Google Analytics