转载请出自出处:http://eksliang.iteye.com/blog/2242495
一.使用消息队列模型发送消息至activeMQ(生产者开发)
package com.gosun.activemq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; /** * 消息队列模型发送消息至activeMQ * @author Ickes */ public class QueueSend { public static void main(String[] args) throws Exception { //第一步:根据url创建一个jms Connection。 ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionfactory.createConnection(); connection.start(); //第二步:根据connection获取session Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //第三步:消息的目的地 Destination destination = new ActiveMQQueue("gosun"); //第四步:创建消息生产者 MessageProducer producer = session.createProducer(destination); //设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第五步:创建消息 Message msg = session.createTextMessage("JMS 告诉你我是ICKES"); //第六步:生产者向JMS发送消息到队列 producer.send(msg); //第七步:关闭连接 session.close(); connection.close(); } }
第二步:Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);解释如下
在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。
- AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。
- CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。
- DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。
- SESSION_TRANSACTED
二、消息消费者,手动接收示例(消费者)
package com.gosun.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者,手动接收示例 * @author Ickes */ public class QueuesAccept { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //消息目的地 Destination dest = session.createQueue("gosun"); //消息消费者 MessageConsumer consumer = session.createConsumer(dest); //接收消息,超时时间为10秒,先手动接受JMS消息,这儿可以用监听 TextMessage textMessage = (TextMessage) consumer.receive(10*1000); String text = textMessage.getText(); System.out.println("接收到的消息为:" + text); //关闭通道 consumer.close(); session.close(); conn.close(); } }
三、消费者监听器模式处理消息
package com.gosun.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 使用监听器,自动接收消息 * @author Lenovo * */ public class QueuesAcceptListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage text = (TextMessage) message; try { System.out.println(text.getText()); } catch (JMSException e) { e.printStackTrace(); } } /** * 测试代码 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //消息目的地 Destination dest = session.createQueue("gosun"); //消息消费者 MessageConsumer consumer = session.createConsumer(dest); consumer.setMessageListener(new QueuesAcceptListener()); //这里不能关闭连接,一旦关闭监听器也就关闭,那就接收不到消息了 } }
四、生产者发送消息到主题中
package com.gosun.activemq.topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic; /** * 生产者发送消息到主题中 * @author Ickes * */ public class TopicSend { public static void main(String[] args) throws Exception { // 第一步:根据url创建一个jms Connection。 ActiveMQConnectionFactory connectionfactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionfactory.createConnection(); connection.start(); // 第二步:根据connection获取session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // 第三步:创建一个Topic Topic topic= new ActiveMQTopic("testTopic"); // 第四步:创建生产者用于将消息发送至主题 MessageProducer producer = session.createProducer(topic); // 设置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 第五步:创建消息 Message msg = session.createTextMessage("JMS 告诉你我是ICKES"); producer.send(msg); //第七步:关闭连接 session.close(); connection.close(); } }
五、消费者从主题中订阅消息
package com.gosun.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者从主题中订阅消息 * @author Ickes * */ public class TopicAccept { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connectionFactory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); //主题目的地 Topic topic =session.createTopic("testTopic"); //注册订阅者 MessageConsumer consumer = session.createConsumer(topic); //手动获取消息 TextMessage textMessage = (TextMessage) consumer.receive(10*1000); String text = textMessage.getText(); System.out.println("接收到的消息为:" + text); //关闭通道 consumer.close(); session.close(); conn.close(); } }
相关推荐
1. **点对点模型**:在JMS中,点对点模型(P2P)是一种消息传递模式,其中生产者发送消息到一个队列(Queue),而消费者从该队列接收消息。每个消息只被一个消费者消费,且一旦被消费,就会从队列中移除。这种模型...
**点对点(Point-to-Point,PTP)模型** 是JMS中的两种主要通信模式之一。在PTP模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者接收。每个消息只能被一个消费者消费,确保消息的独占性。ActiveMQ中...
JMS是一种标准接口,用于Java平台上的消息传递,提供了可靠的消息传递机制,确保消息的顺序和持久性,同时也支持点对点和发布/订阅两种消息模型。 在Spring MVC中集成ActiveMQ,我们可以利用Spring的JMS抽象层,它...
本篇文章将深入探讨在ActiveMQ中实现JMS的点对点(Point-to-Point)消息模型,并讨论如何在不同的项目中设置发送者和接收者。 首先,让我们理解点对点消息模型的基本概念。在点对点模型中,消息从一个生产者(发送...
**队列** 是一种点对点的消息传递模式,每个消息只能被一个消费者接收。当消息发送到队列后,一个或多个消费者可以从队列中获取并处理消息,但只有一个消费者能真正接收到消息。队列确保消息的有序性和至少一次交付...
在JMS点对点模式中,每个消息只有一个消费者,消息被消费后自动从队列中删除。发布/订阅模式下,消息被发布到一个主题,多个订阅者可以接收到相同的消息。 在"jms-test"这个源码文件中,可能包含了一些示例代码,...
点对点(point-to-point,简称 PTP) 发布/订阅(publish/subscribe,简称 pub/sub)。 这两种消息传递模型非常相似,但有以下区别: PTP 消息传递模型规定了一条消息只能传递给一个接收方。 采用javax.jms.Queue ...
ActiveMQ提供了灵活的消息传递模型,无论是点对点还是发布/订阅,都能满足不同场景的需求。点对点模式适用于需要确保消息顺序和唯一性的应用,而发布/订阅模式则适合需要广播消息的场景。通过理解这些基本概念和...
- **点对点模型** (Queue) - 在此模型中,消息发送者直接将消息发送到指定的队列,消息接收者从队列中读取消息。 - 每个消息只能被一个消费者消费。 - 生产者和消费者无需在同一时间在线。 - **发布/订阅模型** ...
在"ActiveMQ开发实例-1"中,你将学习如何使用 ActiveMQ-CPP 库进行基本的发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)模式的消息传递。这两种模式是消息中间件中最常见的通信模型。 1. **发布/订阅...
在本文中,我们将深入探讨ActiveMQ在点对点(Point-to-Point)消息传递和发布-订阅(Publish-Subscribe)消息模式中的实际应用。 首先,让我们了解**点对点消息模式**。这种模式是基于队列(Queue)的,其中每个...
本实例将深入探讨两种基本的消息模型:点对点(P2P)和发布/订阅(Pub/Sub)模式。 1. **点对点(P2P)模式** 在点对点模式中,每个消息只被一个消费者接收,且一旦被消费就会从队列中删除。这种模式非常适合实现...
在点对点模型中,每个消息只有一个消费者,即生产者发送的消息会被一个且仅有一个消费者接收和处理。这种模型通常应用于任务队列,其中每个任务由一个工作进程单独处理。ActiveMQ支持这种模型,通过队列(Queue)...
队列遵循“点对点”模型,其中每个消息只会被一个消费者接收。消息发送到队列后,一个或多个消费者可以尝试接收,但只有一个消费者能成功并处理该消息。这种模式适用于需要确保消息被准确无误地处理的情况。 在...
6. **消息传输**:Spring支持点对点(Queue)和发布/订阅(Topic)两种模式。Queue中,消息被一个消费者接收后,其他消费者无法再收到;而Topic中,消息可以被多个订阅者接收。 7. **事务管理**:Spring的JMS支持与...
它支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,能够处理点对点(Queue)和发布/订阅(Topic)两种通信模型。 点对点(Queue)模式下,每个消息只会被一个消费者消费,适合一对一的通信场景。而在发布/订阅...
JMS中有两种消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个生产者发送到一个队列,然后由一个消费者接收;而在发布/订阅模型中,消息从发布者发送...
本实例将深入探讨如何使用ActiveMQ实现点对点的消息通信以及不同系统之间的互操作性。 点对点(Point-to-Point)消息传递是JMS的核心模式之一,其基本原理是生产者(Producer)发送消息到一个队列(Queue),消费者...
- 主要包含两种消息模型:点对点(Queue)和发布/订阅(Topic),分别对应消息队列和主题。 3. **ActiveMQ安装与配置** - 解压`apache-activemq-5.10.0`压缩包,启动ActiveMQ服务器,通常通过执行`bin\start.bat`...