`
cloud21
  • 浏览: 398540 次
  • 性别: Icon_minigender_2
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ的一个producer----customer例子

    博客分类:
  • JMS
阅读更多
关键字: producertool/customertool                   ActiveMQ的一个简单示例
关键字: activemq

最近由于公司项目需要,开始学习JMS,用的是ActiveMQ。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,写了一个简单的小例子,现在贴出来与大家分享。
ProducerTool.java用于发送消息:

package homework;    
   
import javax.jms.Connection;    
import javax.jms.DeliveryMode;    
import javax.jms.Destination;    
import javax.jms.JMSException;    
import javax.jms.MessageProducer;    
import javax.jms.Session;    
import javax.jms.TextMessage;    
   
import org.apache.activemq.ActiveMQConnection;    
import org.apache.activemq.ActiveMQConnectionFactory;    
   
public class ProducerTool {    
   
    private String user = ActiveMQConnection.DEFAULT_USER;    
   
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;    
   
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;    
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageProducer producer = null;    
   
    // 初始化    
    private void initialize() throws JMSException, Exception {    
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(    
                user, password, url);    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
        destination = session.createQueue(subject);    
        producer = session.createProducer(destination);    
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);    
    }    
   
    // 发送消息    
    public void produceMessage(String message) throws JMSException, Exception {    
        initialize();    
        TextMessage msg = session.createTextMessage(message);    
        connection.start();    
        System.out.println("Producer:->Sending message: " + message);    
        producer.send(msg);    
        System.out.println("Producer:->Message sent complete!");    
    }    
   
    // 关闭连接    
    public void close() throws JMSException {    
        System.out.println("Producer:->Closing connection");    
        if (producer != null)    
            producer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
}   
ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

package homework;    
   
import javax.jms.Connection;    
import javax.jms.Destination;    
import javax.jms.JMSException;    
import javax.jms.MessageConsumer;    
import javax.jms.Session;    
import javax.jms.MessageListener;    
import javax.jms.Message;    
import javax.jms.TextMessage;    
   
import org.apache.activemq.ActiveMQConnection;    
import org.apache.activemq.ActiveMQConnectionFactory;    
   
public class ConsumerTool implements MessageListener {    
   
    private String user = ActiveMQConnection.DEFAULT_USER;    
   
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;    
   
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;    
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageConsumer consumer = null;    
   
    // 初始化    
    private void initialize() throws JMSException, Exception {    
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(    
                user, password, url);    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
            
    }    
   
    // 消费消息    
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    
            
        System.out.println("Consumer:->Begin listening...");    
        // 开始监听    
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    
   
    // 关闭连接    
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    // 消息处理函数    
    public void onMessage(Message message) {    
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            }    
        } catch (JMSException e) {    
            // TODO Auto-generated catch block    
            e.printStackTrace();    
        }    
    }    
}    
如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。

下面是测试类Test.java:

package homework;    
   
import javax.jms.JMSException;    
   
public class Test {    
   
    /**   
     * @param args   
     */   
    public static void main(String[] args) throws JMSException, Exception {    
        // TODO Auto-generated method stub    
        ConsumerTool consumer = new ConsumerTool();    
        ProducerTool producer = new ProducerTool();    
        // 开始监听    
        consumer.consumeMessage();    
            
        // 延时500毫秒之后发送消息    
        Thread.sleep(500);    
        producer.produceMessage("Hello, world!");    
        producer.close();    
            
        // 延时500毫秒之后停止接受消息    
        Thread.sleep(500);    
        consumer.close();    
    }    
}    
以上就是我学习ActiveMQ之后写的一个简单的例子,希望对你有帮助,如果有什么错误还请指正。

分享到:
评论
1 楼 wjm901215 2011-05-13  
导入的那些jar包能具体说下吗

相关推荐

    activemq-cpp-library-3.9.5-src.zip

    创建一个连接: ```cpp std::auto_ptr<ActiveMQ::CMSConnectionFactory> factory( new ActiveMQ::CMSConnectionFactory("tcp://localhost:61616")); ``` 创建会话和消费者: ```cpp std::auto_ptr...

    activemq-web-console-5.11.2

    activemq-web-console的默认使用方式是通过在activemq.xml中导入jetty.xml配置一个jetty server来实现的。其实activemq-web-console完全可以和activemq-broker分开来部署。 activemq-web-console包含3个apps, 1.一...

    activemqBroker-2.14-SNAPSHOT.war

    activemqBroker插件:activemqBroker-2.14-SNAPSHOT.war

    activemq-cpp-library-3.9.5 编译的windows库文件,支持vs2015、vs2017

    在提供的压缩包中,除了核心的`activemq-cpp-library-3.9.5`之外,还包含了`apr_lib`,这是一个Apache Portable Runtime (APR) 库的子集,它为跨平台的系统编程提供了一套底层的服务,如文件I/O、内存管理、线程管理...

    activemq-core-5.7.0-API文档-中文版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...

    activemq-protobuf-1.1-API文档-中文版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    activemq-core-5.7.0-API文档-中英对照版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...

    apache-activemq-5.10到apache-activemq6.1大版本合集

    activemq-parent-5.10.0-source-release.zip activemq-parent-5.10.2-source-release.zip activemq-parent-5.12.3-source-release.zip apache-activemq-5.10.2-bin.tar.gz apache-activemq-5.11.4-bin.zip apache-...

    apache-activemq-5.9.0-bin

    Apache ActiveMQ是世界上最流行的开源消息代理和队列...总之,Apache ActiveMQ是一个强大的消息中间件,适用于构建分布式系统和微服务架构,提供可靠的异步通信机制,确保数据在复杂的网络环境中安全、高效地传递。

    apache-activemq-5.8.0-bin.zip

    Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它基于Java Message Service(JMS)规范,用于在分布式系统中高效地传输数据。这个压缩包"apache-activemq-5.8.0-bin.zip"包含了ActiveMQ 5.8.0版本的...

    apache-activemq-5.15.3-bin.tar.gz

    这个压缩包“apache-activemq-5.15.3-bin.tar.gz”包含了Apache ActiveMQ 5.15.3版本的源代码和可执行文件,适合在Linux环境下部署和使用。 **1. Apache ActiveMQ简介** Apache ActiveMQ是Apache软件基金会的一个...

    activemq-protobuf-1.1-API文档-中英对照版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    apache-activemq-5.15.8-bin.zip

    在安装过程中,解压"apache-activemq-5.15.8-bin.zip"后,你会得到一个包含bin目录的结构,其中包含了启动和停止ActiveMQ服务所需的脚本。在Windows上,你可以使用"bin\win32\activemq.bat",而在Linux或Mac OS上,...

    activemq-web-4.0-M3.jar.zip

    在提供的压缩包"activemq-web-4.0-M3.jar.zip"中,有两个主要文件:"activemq-web-4.0-M3.jar"和"license.txt"。"activemq-web-4.0-M3.jar"是核心的Java档案文件,包含了运行ActiveMQ Web UI所需的所有类和资源。这...

    activemq-all-5.12.0-sources.jar

    activemq-all-5.12.0-sources.jar

    activemq-all-5.2.0-sources.jar

    activemq-all-5.2.0-sources.jar

    apache-activemq-5.16.5-bin.tar.gz 下载(5积分)

    Apache ActiveMQ是Apache软件基金会的一个开源项目,是一个基于消息的通信中间件。ActiveMQ是JMS的一个具体实现,支持JMS的两种消息模型。ActiveMQ使用AMQP协议集成多平台应用,使用STOMP协议通过websockets在Web...

    activemq-jms-pool-5.14.4.jar

    activemq-jms-pool-5.14.4.jar

    activemq-kahadb-store-5.9.1.jar

    标签:activemq-kahadb-store-5.9.1.jar,activemq,kahadb,store,5.9.1,jar包下载,依赖包

    apache-activemq-5.15.12-bin.tar.gz

    tar -zxvf apache-activemq-5.15.12-bin.tar.gz 2.进入bin目录 cd /apache-activemq-5.15.12/bin 3.运行,没有配置环境变量只能在bin目录下使用命令 ./activemq 4.配置环境变量,配置完环境变量之后...

Global site tag (gtag.js) - Google Analytics