`

ActiveMQ的一个producertool/customertool示例

    博客分类:
  • JMS
阅读更多

ActiveMQ的一个简单示例

关键字: activemq

最近由于公司项目需要,开始学习JMS,用的是ActiveMQ。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,写了一个简单的小例子,现在贴出来与大家分享。
ProducerTool.java用于发送消息:

java 代码
  1. package homework;   
  2.   
  3. import javax.jms.Connection;   
  4. import javax.jms.DeliveryMode;   
  5. import javax.jms.Destination;   
  6. import javax.jms.JMSException;   
  7. import javax.jms.MessageProducer;   
  8. import javax.jms.Session;   
  9. import javax.jms.TextMessage;   
  10.   
  11. import org.apache.activemq.ActiveMQConnection;   
  12. import org.apache.activemq.ActiveMQConnectionFactory;   
  13.   
  14. public class ProducerTool {   
  15.   
  16.     private String user = ActiveMQConnection.DEFAULT_USER;   
  17.   
  18.     private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
  19.   
  20.     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;   
  21.   
  22.     private String subject = "TOOL.DEFAULT";   
  23.   
  24.     private Destination destination = null;   
  25.   
  26.     private Connection connection = null;   
  27.   
  28.     private Session session = null;   
  29.   
  30.     private MessageProducer producer = null;   
  31.   
  32.     // 初始化   
  33.     private void initialize() throws JMSException, Exception {   
  34.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
  35.                 user, password, url);   
  36.         connection = connectionFactory.createConnection();   
  37.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   
  38.         destination = session.createQueue(subject);   
  39.         producer = session.createProducer(destination);   
  40.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  41.     }   
  42.   
  43.     // 发送消息   
  44.     public void produceMessage(String message) throws JMSException, Exception {   
  45.         initialize();   
  46.         TextMessage msg = session.createTextMessage(message);   
  47.         connection.start();   
  48.         System.out.println("Producer:->Sending message: " + message);   
  49.         producer.send(msg);   
  50.         System.out.println("Producer:->Message sent complete!");   
  51.     }   
  52.   
  53.     // 关闭连接   
  54.     public void close() throws JMSException {   
  55.         System.out.println("Producer:->Closing connection");   
  56.         if (producer != null)   
  57.             producer.close();   
  58.         if (session != null)   
  59.             session.close();   
  60.         if (connection != null)   
  61.             connection.close();   
  62.     }   
  63. }   

 

ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

java 代码
  1. package homework;   
  2.   
  3. import javax.jms.Connection;   
  4. import javax.jms.Destination;   
  5. import javax.jms.JMSException;   
  6. import javax.jms.MessageConsumer;   
  7. import javax.jms.Session;   
  8. import javax.jms.MessageListener;   
  9. import javax.jms.Message;   
  10. import javax.jms.TextMessage;   
  11.   
  12. import org.apache.activemq.ActiveMQConnection;   
  13. import org.apache.activemq.ActiveMQConnectionFactory;   
  14.   
  15. public class ConsumerTool implements MessageListener {   
  16.   
  17.     private String user = ActiveMQConnection.DEFAULT_USER;   
  18.   
  19.     private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
  20.   
  21.     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;   
  22.   
  23.     private String subject = "TOOL.DEFAULT";   
  24.   
  25.     private Destination destination = null;   
  26.   
  27.     private Connection connection = null;   
  28.   
  29.     private Session session = null;   
  30.   
  31.     private MessageConsumer consumer = null;   
  32.   
  33.     // 初始化   
  34.     private void initialize() throws JMSException, Exception {   
  35.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(   
  36.                 user, password, url);   
  37.         connection = connectionFactory.createConnection();   
  38.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   
  39.         destination = session.createQueue(subject);   
  40.         consumer = session.createConsumer(destination);   
  41.            
  42.     }   
  43.   
  44.     // 消费消息   
  45.     public void consumeMessage() throws JMSException, Exception {   
  46.         initialize();   
  47.         connection.start();   
  48.            
  49.         System.out.println("Consumer:->Begin listening...");   
  50.         // 开始监听   
  51.         consumer.setMessageListener(this);   
  52.         // Message message = consumer.receive();   
  53.     }   
  54.   
  55.     // 关闭连接   
  56.     public void close() throws JMSException {   
  57.         System.out.println("Consumer:->Closing connection");   
  58.         if (consumer != null)   
  59.             consumer.close();   
  60.         if (session != null)   
  61.             session.close();   
  62.         if (connection != null)   
  63.             connection.close();   
  64.     }   
  65.   
  66.     // 消息处理函数   
  67.     public void onMessage(Message message) {   
  68.         try {   
  69.             if (message instanceof TextMessage) {   
  70.                 TextMessage txtMsg = (TextMessage) message;   
  71.                 String msg = txtMsg.getText();   
  72.                 System.out.println("Consumer:->Received: " + msg);   
  73.             } else {   
  74.                 System.out.println("Consumer:->Received: " + message);   
  75.             }   
  76.         } catch (JMSException e) {   
  77.             // TODO Auto-generated catch block   
  78.             e.printStackTrace();   
  79.         }   
  80.     }   
  81. }   

 

如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

下面是测试类Test.java:

java 代码
  1. package homework;   
  2.   
  3. import javax.jms.JMSException;   
  4.   
  5. public class Test {   
  6.   
  7.     /**  
  8.      * @param args  
  9.      */  
  10.     public static void main(String[] args) throws JMSException, Exception {   
  11.         // TODO Auto-generated method stub   
  12.         ConsumerTool consumer = new ConsumerTool();   
  13.         ProducerTool producer = new ProducerTool();   
  14.         // 开始监听   
  15.         consumer.consumeMessage();   
  16.            
  17.         // 延时500毫秒之后发送消息   
  18.         Thread.sleep(500);   
  19.         producer.produceMessage("Hello, world!");   
  20.         producer.close();   
  21.            
  22.         // 延时500毫秒之后停止接受消息   
  23.         Thread.sleep(500);   
  24.         consumer.close();   
  25.     }   
  26. }   

 

以上就是我学习ActiveMQ之后写的一个简单的例子,希望对你有帮助,如果有什么错误还请指正。 

分享到:
评论
1 楼 newboy2004 2013-04-02  
直接运行这个测试,没有结果显示

相关推荐

    ActiveMQ / RabbitMQ 示例

    总之,这个示例程序将帮助你理解如何在不同的消息队列系统中使用JMS API进行消息通信,包括设置服务器配置、创建和管理连接、发送和接收消息等核心操作。这对于构建可扩展的、健壮的分布式系统至关重要。

    使用WebSocket协议接收ActiveMQ消息

    因此,一个完整的WebSocket连接URL可能是这样的:"ws://activemq服务器地址:61614/websocket"。 为了通过WebSocket接收ActiveMQ的消息,客户端需要实现WebSocket的API,并配置相应的连接参数。这通常涉及到创建...

    ActiveMQ 教学视频/教程 /附带笔记等资源

    ActiveMQ 是一个开源的消息中间件,它遵循Java消息服务(JMS)标准,为企业级应用程序提供高效率、可扩展和可靠的异步通信解决方案。在这个"ActiveMQ 教学视频/教程 /附带笔记等资源"的压缩包中,你将找到一系列关于...

    用C#实现的ActiveMQ发布/订阅消息传送

    ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为.NET平台设计的ActiveMQ接口。 首先,我们需要了解发布/订阅模式。在这种模式下,生产者...

    activeMQ+spring+springmvc整合示例

    1. **ActiveMQ**: Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,遵循JMS(Java Message Service)规范。它允许应用程序通过消息传递进行通信,从而解耦生产者和消费者,提高系统的灵活性和可扩展性...

    apache-activemq-5.15.12-bin.tar.gz

    1.解压  ...ACTIVEMQ_HOME=/opt/apache-activemq-5.15.12 PATH=/opt/apache-activemq-5.15.12/bin:$PATH export ACTIVEMQ_HOME PATH 5.activemq的后台默认端口是61616,前台访问端口是8161

    基于kahadb的activemq高可用集群部署配置示例

    ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能强大的消息代理,支持多种协议,包括AMQP、STOMP、MQTT等。在企业级应用中,为了保证服务的高可用性和数据的一致性,通常会采用集群部署。本示例将详细讲解...

    ActiveMQ结合Spring收发消息的示例代码

    以下是一个简单的配置示例: ```xml <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"...

    CentOS安装Activemq图文教程

    将activemq文件夹剪切到/usr目录下,然后打开终端,输入`gedit /usr/activemq/bin/activemq`命令,以打开activemq文件。在文件中添加以下代码: ``` ### BEGIN INIT INFO # Provides: activemq # Required-Start: $...

    基于SpringBoot的ActiveMQ生产者/消费者

    在本项目中,我们探讨的是如何使用SpringBoot集成Apache ActiveMQ来构建一个生产者和消费者的应用。SpringBoot以其简洁的配置和快速启动特性,成为现代Java应用开发的首选框架之一,而ActiveMQ则是流行的消息中间件...

    ActiveMQ-jms jar包

    1. **Message**: JMS消息是数据的载体,它可以从一个应用程序发送到另一个应用程序。消息可以是文本、二进制数据或自定义对象。 2. **MessageProducer**: 这个接口用于创建和发送消息到目的地。开发者可以通过调用...

    C# ActiveMQ 和Spring.NET框架开发示例

    ActiveMQ是Apache软件基金会的一个项目,它支持Java消息服务(JMS)1.1规范,并且是一个用Java编写的、支持多语言客户端开发的消息中间件产品。ActiveMQ以其支持多种编程语言开发的客户端而著称,使得开发者可以使用...

    linux 下apache-activemq.zip

    在Linux上安装Apache ActiveMQ通常涉及下载最新版本的zip文件,然后将其解压到一个合适的目录。你可以通过访问Apache官方网站获取最新的版本。使用`wget`命令下载,例如: ``` wget ...

    ActiveMQ路由配置方式

    ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 使用Camel Choice进行配置...

    activeMQ简单入门案例

    本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...

    activemq自启动并设置用户名密码

    创建一个指向ActiveMQ启动脚本的软链接,并将其放置在`/etc/init.d/`目录下: ```bash ln -s /usr/local/apache-activemq/bin/activemq /etc/init.d/ chmod +x /etc/init.d/activemq chkconfig --add activemq ...

    activemq 入门示例代码

    1. 创建一个新的 Maven 项目:在你的 IDE(如 IntelliJ IDEA 或 Eclipse)中,选择创建一个新的 Maven 项目。 2. 配置 POM.xml:在项目的根目录下,找到并打开 `pom.xml` 文件,添加以下依赖,用于引入 ActiveMQ ...

    activemq+spring demo 简单示例222

    本文将以“activemq+spring demo”为例,深入探讨如何将Apache ActiveMQ与Spring框架进行整合,并通过一个简单的示例来展示其实现过程。 首先,我们需要了解ActiveMQ。ActiveMQ是Apache软件基金会开发的一个开放源...

Global site tag (gtag.js) - Google Analytics