1.准备工作
1) 下载安装,启动activemq
2) 下载activemq jar包导入项目
2.消息生产者
package com.activemq.demo1; import javax.jms.*; import org.apache.activemq.*; /** * 消息生产者,用于生成并发送消息 */ public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; /** * 初始化 * @throws Exception */ private void initialize() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); /* 创建Session,参数解释: 第一个参数 是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认, 没有回应则抛出异常,消息发送程序负责处理这个错误。 第二个参数 消息的确认模式: AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次, 但传输过程中可能因为错误而丢失消息。 CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法 (会通知消息提供者收到了消息) DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息 (这种确认模式不在乎接收者收到重复的消息)。*/ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); //设置是否持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } /** * 发送消息 * @param message * @throws Exception */ public void produceMessage(String message) throws Exception { initialize(); //发送TextMessage,还可发送MapMessage,ObjectMessage,StreamMessage TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:-> send start."); producer.send(msg); System.out.println("Producer:-> send complete."); close(); } /** * 关闭连接 * @throws JMSException */ public void close() throws JMSException { System.out.println("Producer:->Closing Connection."); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
3.消息消费者
package com.activemq.demo1; import javax.jms.*; import javax.jms.Message; import org.apache.activemq.*; /** * 消息消费者,用于接收消息 */ public class ConsumerTool implements MessageListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; /** * 初始化 * @throws JMSException * @throws Exception */ private void initialize() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } /** * 消费消息 * @throws Exception */ public void consumeMessage() throws Exception { initialize(); connection.start(); System.out.println("Consumer:->Begin listening..."); // 开始监听 consumer.setMessageListener(this); } /** * 关闭连接 * @throws JMSException */ public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } /** * 消息处理函数 */ public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received textMessage: " + msg); } else { System.out.println("Consumer:->Received: " + message); } close(); } catch (JMSException e) { e.printStackTrace(); } } }
4.测试类
package com.activemq.demo1; import javax.jms.*; public class Test { /** * @param args */ public static void main(String[] args) throws JMSException, Exception { ConsumerTool consumer = new ConsumerTool(); ProducerTool producer = new ProducerTool(); // 开始监听 consumer.consumeMessage(); // 延时500毫秒之后发送消息 Thread.sleep(500); producer.produceMessage("Hello, world!"); } }
相关推荐
### ActiveMQ 实战 #### JMS 基本构件概览 **ActiveMQ** 是一个高性能、功能丰富的开源消息中间件,它实现了 **Java Message Service (JMS)** 规范。JMS 规范定义了一组接口,这些接口提供了一个标准的方式来进行...
**ActiveMQ实战(英文版)** ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循Java消息服务(JMS)规范,支持多种协议,并且跨平台运行。这本书《ActiveMQ实战(英文版)》旨在深入探讨如何在实际环境中...
#### 三、ActiveMQ实战案例介绍 - **案例概述**:本书通过一系列实例来展示如何使用ActiveMQ构建各种应用场景下的消息传递系统。 - **目的**:通过实际案例帮助读者深入了解ActiveMQ的核心概念和技术细节,并学会...
**ActiveMQ实战资料详解** Apache ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(Java Message Service,简称JMS)的一个实现。在分布式系统中,ActiveMQ扮演着至关重要的角色,它允许...
ActiveMQ实战手册以介绍JMS和ActiveMQ的操作及配置为主,JMS(Java Message Service)是Java平台中对于面向消息中间件(MOM)的一种标准的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。...
在这个“ActiveMQ实战demo”中,我们将深入探讨如何使用ActiveMQ进行消息发送和接收,并了解其工作原理。 首先,让我们了解一下JMS。JMS是Java平台上的一个标准接口,定义了生产、消费、管理和消息队列的标准API。...
ActiveMQ是一种广泛使用的企业级消息中间件,它遵循JMS(Java Message Service)规范,为分布式系统中的不同应用提供可靠的消息传递服务。在本篇内容中,我们会深入探讨ActiveMQ的特性、安装与运行,以及它如何与...
我下载的时候是 ActiveMQ 5.14.4 Release版 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是...
在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...
1. **Apache ActiveMQ 简介**: Apache ActiveMQ 是一个功能强大的、高性能的、多语言支持的消息代理。它遵循 JMS 规范,提供多种协议支持,如 OpenWire、STOMP、AMQP 和 MQTT,可以用于分布式系统中的异步通信。...
《Spring与ActiveMQ整合实战详解》 在Java开发领域,消息队列(Message Queue)作为解耦、异步处理和提高系统吞吐量的重要工具,被广泛应用。Spring框架以其强大的集成能力,使得与各种消息中间件如ActiveMQ的整合...
通过这个实战项目,开发者可以深入理解如何在Java Web环境中使用ActiveMQ,以及如何解决在实际应用中可能出现的问题,如消息的可靠传输和幂等问题。同时,它也提供了一个良好的学习平台,帮助初学者掌握SSH与...
- **ActiveMQ**:Apache ActiveMQ 是一个开源的消息中间件,它实现了 Java Message Service (JMS) 规范,提供可靠的消息传递和队列管理。 - **JMS**:Java Message Service 是一个标准接口,用于在分布式环境中交换...
本视频教程通过实战的方式介绍了 ActiveMQ 的集群搭建与应用,涵盖了从基础概念到实际部署的全过程。通过学习这些知识点,不仅可以帮助开发者深入了解 ActiveMQ 的工作原理,还能够掌握如何在实际项目中有效地利用 ...
Spring Boot ActiveMQ学习练习demo项目源码是一个针对Java开发者的学习资源,它涵盖了使用Spring Boot集成ActiveMQ进行消息队列操作的基本实践。ActiveMQ是Apache软件基金会的一个开源项目,它是Java消息服务(JMS)...
ActiveMQ 是 Apache 开源项目提供的一款高效、可靠的开源消息中间件,它实现了 Java 消息服务(Java Message Service,简称 JMS),为分布式系统中的应用提供了异步通信的能力。JMS 是一个工业标准,它定义了一组...
在ActiveMQ中,生产者和消费者都是通过JMS API(Java Message Service Application Programming Interface)来与消息代理进行交互的。 生产者核心代码逻辑部分,主要介绍了如何使用JMS API来构建消息生产者,并通过...
标题中的"ActiveMQ 使用Ajax 收发消息实战"指出我们将探讨如何使用ActiveMQ消息中间件与Ajax技术结合,实现Web应用程序中的异步消息传递。ActiveMQ是Apache软件基金会的一个项目,提供了一个开源的消息代理,支持...
四、ActiveMQ实战应用 在实际应用中,ActiveMQ常用于以下场景: - 微服务间的异步通信:微服务架构中,通过消息队列可避免直接调用导致的阻塞,提高系统响应速度。 - 数据同步:在分布式数据库或主从复制环境中,...