- 浏览: 188330 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (321)
- eclipse (4)
- idea (2)
- Html (8)
- Css (14)
- Javascript (8)
- Jquery (6)
- Ajax Json (4)
- Bootstrap (0)
- EasyUI (0)
- Layui (0)
- 数据结构 (0)
- Java (46)
- DesPattern (24)
- Algorithm (2)
- Jdbc (8)
- Jsp servlet (13)
- Struts2 (17)
- Hibernate (11)
- Spring (5)
- S2SH (1)
- SpringMVC (4)
- SpringBoot (11)
- WebService CXF (4)
- Poi (2)
- JFreeChart (0)
- Shiro (6)
- Lucene (5)
- ElasticSearch (0)
- JMS ActiveMQ (3)
- HttpClient (5)
- Activiti (0)
- SpringCloud (11)
- Dubbo (6)
- Docker (0)
- MySQL (27)
- Oracle (18)
- Redis (5)
- Mybatis (11)
- SSM (1)
- CentOS (10)
- Ant (2)
- Maven (4)
- Log4j (7)
- XML (5)
最新评论
1. 直接Receive方式
2. 使用Listener监听方式
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。 Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。 Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMSprovider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMSprovider必须把消息头的JMSRedelivered字段设置为true。
新建项目JMS 导入jar包 activemq-all-5.11.1.jar 右键build path JMSProducer.java package com.andrew.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM = 3; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session destination = session.createQueue("FirstQueue1"); // 创建消息队列 messageProducer = session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 */ public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < JMSProducer.SENDNUM; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i); System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i); messageProducer.send(message); } } }
发送消息:ActiveMQ 发送的消息0 发送消息:ActiveMQ 发送的消息1 发送消息:ActiveMQ 发送的消息2
发送消息:ActiveMQ 发送的消息0 发送消息:ActiveMQ 发送的消息1 发送消息:ActiveMQ 发送的消息2
JMSConsumer.java package com.andrew.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session destination = session.createQueue("FirstQueue1"); // 创建连接的消息队列 messageConsumer = session.createConsumer(destination); // 创建消息消费者 while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if (textMessage != null) { System.out.println("收到的消息:" + textMessage.getText()); } else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }
收到的消息:ActiveMQ 发送的消息0 收到的消息:ActiveMQ 发送的消息1 收到的消息:ActiveMQ 发送的消息2 收到的消息:ActiveMQ 发送的消息0 收到的消息:ActiveMQ 发送的消息1 收到的消息:ActiveMQ 发送的消息2
2. 使用Listener监听方式
Listener.java package com.andrew.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听 */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } JMSConsumerListener.java package com.andrew.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 */ public class JMSConsumerListener { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumerListener.USERNAME, JMSConsumerListener.PASSWORD, JMSConsumerListener.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session destination = session.createQueue("FirstQueue1"); // 创建连接的消息队列 messageConsumer = session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } } 执行JMSProducer.java产生3条消息
执行JMSConsumerListener.java 收到的消息:ActiveMQ 发送的消息0 收到的消息:ActiveMQ 发送的消息1 收到的消息:ActiveMQ 发送的消息2
相关推荐
在“简单的activemq点对点的同步消息模型”中,我们将探讨如何构建一个基本的、基于ActiveMQ的点对点消息传递系统,以及它的工作原理。 1. **点对点模型**:在JMS中,点对点模型(P2P)是一种消息传递模式,其中...
本篇文章将深入探讨在ActiveMQ中实现JMS的点对点(Point-to-Point)消息模型,并讨论如何在不同的项目中设置发送者和接收者。 首先,让我们理解点对点消息模型的基本概念。在点对点模型中,消息从一个生产者(发送...
在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...
在ActiveMQ中,我们可以通过创建一个Queue并发送、接收消息来实现点对点通信。例如,我们可以使用`MessageProducer`发送消息到队列,然后通过`MessageConsumer`从队列中获取消息。 2. **发布/订阅模式(Publish/...
在JMS中,有两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。 1. **点对点模型**:在这种模型中,消息由一个生产者发送到一个队列,然后被一个消费者接收。每个消息...
以下是一个简单的ActiveMQ点对点发送消息的Java代码片段: ```java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Connection; import javax.jms.MessageProducer; import javax....
在本文中,我们将深入探讨如何使用Spring框架与Apache ActiveMQ集成,实现点对点通信(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的通信。ActiveMQ是流行的开源消息中间件,它允许应用程序之间异步传输...
与点对点模型不同,发布/订阅模式中的消费者可以是多个,每个订阅者都能接收到所有发布的消息。 C#中使用ActiveMQ的NMS库来进行消息操作。NMS提供了一组API,使得.NET开发者能够轻松地与ActiveMQ交互。以下是一些...
ActiveMQ提供了多种消息模式,包括点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)模式。点对点模式中,消息生产者发送消息到队列,消费者从队列中拉取消息,消息在队列中仅能被一个消费者消费。在发布/...
首先,我们需要了解ActiveMQ的核心特性,如点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)模型,这两种模式是消息传递的两种基本方式。点对点模式下,每个消息只有一个消费者,而发布/订阅模式下,一个...
总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...
**ActiveMQ**是Apache软件基金会开发的一款开源消息中间件,实现了JMS标准,提供了对点对点和发布/订阅模型的支持。ActiveMQ具备以下特性: 1. **跨语言支持**:除了Java,还支持其他编程语言,如C++、Python等。 2....
9. **反馈机制**:如果发送者希望在发送消息后得到接收者的确认信息,可以通过设置消息属性或者使用特定协议(如AMQP的Acknowledgement模式)来实现。ActiveMQ支持回调机制,允许在消息被成功接收后返回确认信息给...
总结,ActiveMQ的点对点和发布/订阅模型为企业级应用提供了灵活的消息传递方案。点对点模式适用于一对一的消息传递,而发布/订阅模式适用于一对多或广播场景。理解这两种通信方式及其在实际应用中的实现,对于设计...
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
在本示例中,我们将关注如何使用JMS来模拟ActiveMQ代理服务器并实现消息的发送与接收。 ActiveMQ是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,提供了可靠的消息传递功能。通过模拟ActiveMQ代理...
4. **消息类型**:JMS支持两种消息模型:点对点(Queue)和发布/订阅(Topic)。Queue模式下,消息会被一个消费者接收并删除;Topic模式下,消息可以被多个订阅者接收。 5. **消息的持久化**:ActiveMQ允许配置消息...
在 ActiveMQ 中,点对点(P2P)模式是基于队列(Queue)的一种通信方式。在这种模式下,消息生产者发送消息到队列,而消息消费者从队列中接收消息。队列具有先进先出(FIFO)的特性,即每个消息只能被一个消费者消费...
ActiveMQ还支持多种消息模式,如点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)以及事务处理。 **ActiveMQ内部实现** ActiveMQ的核心组件包括: 1. **Broker**: 它是ActiveMQ的核心,负责接收、存储...
点对点模型确保每个消息仅被一个消费者接收,而发布/订阅模型允许多个订阅者接收相同的消息。 3. **消息类型**:理解JMS提供的不同消息类型,如文本消息、对象消息、流消息和二进制消息,以及如何通过ActiveMQ收发...