`
sbl2255
  • 浏览: 218302 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ 初学

    博客分类:
  • jdk
 
阅读更多

直接上代码:

  发送:

 public static void main(String[] args) {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS  
        // Provider 的连接  
        Connection connection = null; // Session: 一个发送或接收消息的线程  
        Session session; // Destination :消息的目的地;消息发送给谁.  
        Destination destination; // MessageProducer:消息发送者  
        MessageProducer producer; // TextMessage message;  
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)");  
        try { // 构造从工厂得到连接对象  
            connection = connectionFactory.createConnection();  
            // 启动  
            connection.start();  
            // 获取操作连接  
            session = connection.createSession(Boolean.TRUE,  Session.AUTO_ACKNOWLEDGE);  
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            // 得到消息生成者【发送者】  
            producer = session.createProducer(destination);  
            // 设置不持久化,此处学习,实际根据项目决定  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // 构造消息,此处写死,项目就是参数,或者方法获取  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer)  
            throws Exception {  
        for (int i = 1; i <= SEND_NUMBER; i++) {  
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息"  
                    + i);  
            // 发送消息到目的地方  
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);  
            producer.send(message);  
        }  
    }  

  接收:

 public static void main(String[] args) {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
        ConnectionFactory connectionFactory;  
        // Connection :JMS 客户端到JMS Provider 的连接  
        Connection connection = null;  
        // Session: 一个发送或接收消息的线程  
        Session session;  
        // Destination :消息的目的地;消息发送给谁.  
        Destination destination;  
        // 消费者,消息接收者  
        MessageConsumer consumer;  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)");  
        try {  
            // 构造从工厂得到连接对象  
            connection = connectionFactory.createConnection();  
            // 启动  
            connection.start();  
            // 获取操作连接  
            session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE);  
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            consumer = session.createConsumer(destination);  
            while (true) {  
                // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s  
            	
                TextMessage message = (TextMessage) consumer.receive(100000);  
                if (null != message) {  
                    System.out.println("收到消息" + message.getText());  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  

 

基于Spring的ActiveMQ:

  XML文件:

<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://127.0.0.1:61616" />
		<property name="clientID" value="clientId_007" />
	</bean>


<bean id="deviceListener" class="net.coc.activemq.listener.DeviceMessageListener">
		<property name="deviceAdapter" ref="deviceAdapter"></property>
	</bean>

<bean id="deviceTopic" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="Assets.Device.Topic" />
	</bean>

<bean
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="topicListenConnectionFactory" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="10000" />
		<property name="destination" ref="deviceTopic" />
		<property name="subscriptionDurable" value="true" />
		<property name="clientId" value="deviceListenerclientId_007" />
		<property name="durableSubscriptionName" value="deviceListenerclientId_007" />
		<property name="messageListener" ref="deviceListener" />
	</bean>

 

Java:

public class DeviceMessageListener implements MessageListener {

	private DeviceAdapter deviceAdapter;

	public DeviceAdapter getDeviceAdapter() {
		return deviceAdapter;
	}

	public void setDeviceAdapter(DeviceAdapter deviceAdapter) {
		this.deviceAdapter = deviceAdapter;
	}

	@Override
	public void onMessage(Message message) {
		TextMessage textMsg = (TextMessage) message;
		try {
			JSONObject msg = JSONObject.fromObject(textMsg.getText());
			deviceAdapter.execute(msg);
			deviceAdapter.finish();
		} catch (JMSException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (Exception e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}
}

 

 

deviceAdapter 是具体解析消息的处理类,这个需要根据各自的需求自己定制;

 

 

 

分享到:
评论

相关推荐

    activeMQ初学使用demo

    通过这个"ActiveMQ初学使用demo",你将能够掌握ActiveMQ的基本操作,理解消息队列和主题的工作原理,以及如何在实际项目中使用它们。这将为你的分布式系统设计打下坚实的基础,帮助你有效地解耦系统组件,提高系统的...

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    消息队列学习(springboot+kafka+activemq)

    基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...

    ActiveMQ基础讲解.ppt

    此文档专门针对ActiveMQ初学者,内容清晰易懂,适合所有开发人员用

    jms之activeMQ 队列和广播模式例子(主要给初学者提供入门知识)

    这篇博客"jms之activeMQ 队列和广播模式例子"主要面向初学者,旨在提供ActiveMQ入门级的知识,通过实例解释队列(Queue)和主题(Topic)这两种基本的消息模式。 首先,我们要理解JMS中的队列和主题的区别。队列...

    activemq

    无论是对于初学者还是有经验的开发者来说,理解这些概念都是非常重要的。掌握了 CMS 的基本原理和 ActiveMQ-CPP 的使用方法后,开发者就能够更高效地构建复杂的应用系统,并利用消息队列的强大功能来优化应用程序的...

    ActiveMQ的activemq.xml详细配置讲解

    对于初学者,理解这些基本元素是掌握ActiveMQ配置的关键。配合提供的文档,如《activeMQ in Action.doc》和《ActiveMQ测试报告.pdf》,可以更深入地学习ActiveMQ的工作原理和最佳实践。对于与数据库的集成,如`...

    activemq_demo,activeMQ的简单demo

    这种简易性使得初学者能够快速上手,深入理解消息中间件的基本概念和功能。 在实际的ActiveMQ demo中,通常会包含以下关键知识点: 1. **安装与配置**:首先,你需要下载并安装ActiveMQ,然后启动其内置的Broker...

    分享一些ActiveMQ的资料

    这是一本实践导向的入门指南,适合初学者,通常会包含实例代码和逐步教程,帮助读者快速掌握ActiveMQ的基本用法。 8. **《Java消息服务(第2版)》.pdf**: 这是一本详细介绍JMS规范的书籍,对于理解ActiveMQ的...

    ActiveMQ In Action精简版

    根据给定的文件信息,以下是从...无论是对于初学者还是有经验的开发者来说,这本书都提供了从理论到实践的全面指导,帮助读者掌握ActiveMQ的核心概念和实际操作技能,从而在分布式系统设计和开发中发挥关键作用。

    ActiveMQ使用SSL加密文件Demo

    本 Demo 将向你展示如何配置 ActiveMQ 以使用 SSL 加密文件,帮助初学者快速理解并实践这一关键安全措施。 首先,了解 SSL/TLS 协议是至关重要的。SSL(Secure Sockets Layer)与 TLS(Transport Layer Security)...

    ActiveMQ实战(英文版)

    **ActiveMQ实战(英文版)** ...总之,《ActiveMQ实战(英文版)》是一本全面介绍ActiveMQ的实用指南,无论你是初学者还是有经验的开发者,都能从中获得宝贵的知识,提升你在分布式系统中的消息处理能力。

    ActiveMQ In Action及其源码

    总的来说,《ActiveMQ In Action》及其源码是学习和掌握ActiveMQ不可或缺的参考资料,无论你是初学者还是经验丰富的开发者,都能从中受益匪浅。通过学习和实践,你可以更好地利用ActiveMQ构建可靠、高效的分布式系统...

    activeMQ视频

    本视频学习资料《一头扎进JMS之ActiveMQ》第一讲,旨在帮助初学者快速理解ActiveMQ的基本概念、功能以及如何在实际项目中应用。 JMS(Java Message Service)是Java平台中用于企业级应用之间进行异步通信的标准接口...

    Apache+ActiveMQ教程

    Apache ActiveMQ 是一款高度可扩展且功能强大的消息中间件,它是Apache软件基金会的一部分,完全开源且免费...无论你是初学者还是经验丰富的开发者,深入理解并熟练掌握ActiveMQ都将极大地提升你的系统设计和实现能力。

    activemq demo

    【描述】:“这个示例是关于ActiveMQ的实践应用,它结合了Spring框架,以单元测试的形式呈现了各种使用场景,对于初学者理解ActiveMQ的功能和用法非常有帮助。” 【标签】:“activemq”表明这是与ActiveMQ相关的...

    ActiveMQ的安装与使用

    ActiveMQ是一种基于JMS的开源消息代理,广泛应用于企业消息传递解决方案中。...对于初学者来说,熟悉这些基本操作是学习ActiveMQ消息传递和消息队列管理的起点,也是将ActiveMQ集成进各种企业应用架构的基础。

    ActiveMQ插件

    最后,对于初学者,可以参考ActiveMQ的官方文档和社区资源,那里有许多示例和教程可以帮助理解和实现插件。同时,理解JMS规范以及消息传递模式(如点对点和发布/订阅)也是使用ActiveMQ插件的基础。 总的来说,...

    ActiveMQ实践入门指南

    此外,官方文档提供了详尽的安装指导,包括但不限于下载、配置、测试等步骤,帮助初学者快速上手。 #### 使用场景 在实际应用中,ActiveMQ广泛应用于以下几种典型场景: 1. **消息路由与分发**:ActiveMQ作为消息...

Global site tag (gtag.js) - Google Analytics