`
virusfu
  • 浏览: 182936 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JMS生产者消费者模式收发通用类

    博客分类:
  • jms
 
阅读更多

jms提供者为ActiveMQ

 

import java.util.Map;
import java.util.UUID;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * mq通用类
 * 
 * @author Fu Wei
 * 
 */
@Component
public class ActiveMQQueueCommon {
	private static final Logger LOG = LoggerFactory.getLogger(ActiveMQQueueCommon.class);

	@Autowired
	private JmsTemplate jmsTemplate;

	/**
	 * 异步发送 不支持特定消息
	 * 
	 * @param reqQueue
	 * @param text
	 */
	public void asyncSend(ActiveMQQueue reqQueue, final String text) {
		LOG.debug("发送的XML文内容:{}", text);
		final String correlationId = UUID.randomUUID().toString();
		jmsTemplate.send(reqQueue, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage msg = session.createTextMessage(text);
				msg.setJMSCorrelationID(correlationId);
				return msg;
			}
		});
	}

	/**
	 * 异步发送,关联消息id
	 * 
	 * @param reqQueue
	 * @param text
	 * @param propertyName
	 * @param propertyValue 支持一个特定消息
	 */
	public void asyncSend(ActiveMQQueue reqQueue, final String text, final String propertyName,
	        final String propertyValue) {
		LOG.debug("发送的XML文内容:{}", text);
		final String correlationId = UUID.randomUUID().toString();
		jmsTemplate.send(reqQueue, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage msg = session.createTextMessage(text);
				msg.setJMSCorrelationID(correlationId);
				msg.setStringProperty(propertyName, propertyValue);
				return msg;
			}
		});
	}

	/**
	 * 异步发送,关联消息id
	 * 
	 * @param reqQueue
	 * @param text
	 * @param propertyName
	 * @param propertyMap 选择器参数
	 */
	public void asyncSend(ActiveMQQueue reqQueue, final String text, final String propertyName,
	        final Map<String, String> propertyMap) {
		LOG.debug("发送的XML文内容:{}", text);
		final String correlationId = UUID.randomUUID().toString();
		jmsTemplate.send(reqQueue, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage msg = session.createTextMessage(text);
				msg.setJMSCorrelationID(correlationId);
				if (propertyMap != null && propertyMap.size() > 0) {
					for (Map.Entry<String, String> map : propertyMap.entrySet()) {
						msg.setStringProperty(map.getKey(), map.getValue());
					}
				}
				return msg;
			}
		});
	}

	/**
	 * 同步发送,不带消息特定属性
	 * 
	 * @param reqQueue
	 * @param resQueue
	 * @param messText
	 * @param timeout
	 * @return
	 * @throws JMSException
	 */
	public String syncSend(ActiveMQQueue reqQueue, final ActiveMQQueue resQueue, final String messText, long timeout) {
		return syncSend(reqQueue, resQueue, messText, timeout, null);
	}

	/**
	 * 同步发送,带消息特定属性
	 * 
	 * @param reqQueue
	 * @param resQueue
	 * @param messText
	 * @param timeout
	 * @param propertyName
	 * @param propertyValue
	 * @return
	 * @throws JMSException
	 */
	public String syncSend(ActiveMQQueue reqQueue, final ActiveMQQueue resQueue, final String messText, long timeout,
	        final Map<String, String> propertyMap) {
		LOG.debug("转发的消息:{}, 超时时间:{}", messText, timeout);
		final String correlationId = UUID.randomUUID().toString();
		jmsTemplate.send(reqQueue, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage msg = session.createTextMessage(messText);
				msg.setJMSReplyTo(resQueue);
				msg.setJMSCorrelationID(correlationId);
				// 添加消息特定属性
				if (propertyMap != null && propertyMap.size() > 0) {
					for (Map.Entry<String, String> map : propertyMap.entrySet()) {
						msg.setStringProperty(map.getKey(), map.getValue());
					}
				}
				return msg;
			}
		});
		jmsTemplate.setReceiveTimeout(timeout * 1000);
		TextMessage recvMsg = (TextMessage) jmsTemplate.receiveSelected(resQueue, "JMSCorrelationID = '"
		        + correlationId + "'");
		String recvMessText = null;
		try {
			recvMessText = recvMsg.getText();
		} catch (JMSException e) {
			LOG.error("jms错误", e);
		}
		LOG.debug("propertyMap: {}, 返回的信息:{}", propertyMap, recvMessText);
		return recvMessText;
	}
}
 

 

分享到:
评论

相关推荐

    java IBM MQ 7.5.0 生产者和消费者实例

    JMS API包含两种主要对象:`QueueSender`(生产者)和`QueueReceiver`(消费者)。 3. **生产者实例**: 生产者应用负责创建并发送消息到MQ队列。在Java中,这通常涉及到以下步骤: - 创建一个`ConnectionFactory...

    jms远程IBM MQ 收发消息

    5. **创建生产者和消费者**:会话可以创建`javax.jms.MessageProducer`和`javax.jms.MessageConsumer`。生产者用于发送消息,消费者用于接收消息。 6. **发送和接收消息**: - **同步发送**:使用`MessageProducer...

    spring 整合 activemq 生产者和消费者 案例源码

    Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...

    JMS调用IBM MQ监听模式

    通过JMS,我们可以创建生产者(发送消息)和消费者(接收消息),并可以选择同步或异步模式。在这个场景下,“监听模式”指的是JMS消费者使用的一种异步接收消息的方式。 在监听模式中,消费者会创建一个JMS消息...

    JMS收发

    主题遵循发布/订阅模式,一个消息可以被多个订阅者消费。 6. **消息代理(Message Broker)或消息中间件**:负责管理消息队列、路由消息以及保证消息的可靠传输。 在实际应用中,JMS通常用于以下场景: - **解耦**...

    JMS IBM MQ 订阅模式

    点对点模式中,消息从一个生产者发送到一个队列,然后由一个消费者接收。发布/订阅模式下,消息由发布者发送到主题,多个订阅者可以同时接收消息。 2. **消息对象**:JMS定义了四种消息对象:Message、TextMessage...

    jms文档资料(三份)

    5. 创建生产者(Producer)和消费者(Consumer):分别用于发送和接收消息。 6. 发送和接收消息:生产者通过会话创建消息并发送到目的地,消费者从目的地接收消息。 四、JMS的实现提供商 JMS是一个标准接口,有...

    JMS 简介以及Weblogic配置JMS图解

    2. **JMS客户**:基于Java的应用程序或对象,它们是消息的生产者或消费者。 3. **JMS生产者**:创建并发送消息的组件。 4. **JMS消费者**:接收和处理消息的组件。 5. **JMS消息**:包含数据的对象,由报头和消息...

    JMS规范教程pdf

    在JMS中,消息是指由一个或多个消息生产者(Message Producer)发送,并由一个或多个消息消费者(Message Consumer)接收的数据单元。消息可以包含各种类型的信息,如文本、对象、映射、流等,是JMS通信的基本单位。...

    JMS消息模型 JMS学习.doc

    4. **JMS Tools**:JMS提供了一系列工具,包括消息生产者、消费者和管理工具,帮助开发者创建、管理和监控消息的生命周期。 5. **JMS Exception Handling**:JMS规范定义了异常处理机制,确保在遇到错误时,消息...

    JMS经典实例 基于weblogic

    JMS允许应用程序创建、发送、接收和读取消息,以此来解耦生产者和消费者,使得两者不必同时在线也能进行通信。在基于WebLogic的环境中,JMS被广泛应用于构建可扩展、高可用性的应用程序。 WebLogic Server是Oracle...

    java.jms.jar JMS需要的JAR包

    1. **消息(Message)**:是数据的载体,用来在JMS producer(生产者)和consumer(消费者)之间传输信息。消息可以是文本、二进制数据或者更复杂的数据结构。 2. **消息队列(Queue)** 和 **主题(Topic)**:是...

    weblogic中使用JMS发送和接受消息

    当发布者向主题发送消息时,所有订阅该主题的消费者都会收到消息。这种模式适用于广播或多播通信。 ### 二、配置JMS资源 在WebLogic中,你需要配置JMS模块、目的地(队列或主题)、以及相关的连接工厂和目的地工厂...

    ActiveMQ集群及生产者和消费者Java代码.zip

    5. **Java编程接口**:在ActiveMQ中,Apache的`org.apache.activemq.ActiveMQConnectionFactory`和`javax.jms`包提供了创建连接、会话、生产者和消费者所需的类和接口。开发者将使用这些API来实现消息的发送和接收。...

    jms sample

    这种模式确保了即使生产者和消费者不在同一时间在线,消息也能被正确处理。 2. **主题(Topic)**:与消息队列不同,主题支持发布/订阅模式。多个消费者可以订阅同一个主题,当有新消息发布时,所有订阅者都会收到...

    JMS demo 及 资料

    在"JMS入门级的蹩脚篇.ppt"这个文件中,可能包含了以下内容:JMS的基本概念解释,如何创建消息,如何设置消息队列和主题,如何编写生产者和消费者代码示例,以及如何配置和运行JMS应用程序。这些内容对于初学者理解...

    jms之activeMQ 队列和广播模式例子(主要给初学者提供入门知识)

    这篇博客"jms之activeMQ 队列和广播模式例子"主要面向初学者,旨在提供ActiveMQ入门级的知识,通过实例解释队列(Queue)和主题(Topic)这两种基本的消息模式。 首先,我们要理解JMS中的队列和主题的区别。队列...

    JMS简单示例1

    在JMS中,消息是数据传输的载体,而消息生产者和消费者通过消息中间件(如消息队列或主题)进行通信。 **JMS核心概念** 1. **消息(Message)**:JMS中的基本数据单元,包含数据和元数据(如目标地址、优先级等)...

    JMS学习资料(word文档)

    - 在这种模式下,生产者是出版者,消费者是订阅者。 - MOM提供的频道是主题(Topic)。出版者将消息发布到主题上,订阅者订阅他们感兴趣的主题。 - 每个主题可以有多个订阅者,每个订阅者都会收到消息的一个副本...

    java-jms小例子

    JMS的核心概念包括消息生产者、消息消费者和消息队列/主题。在这个小例子中,我们将探讨如何创建这两者以及如何利用它们进行通信。 1. **消息生产者**:在JMS中,消息生产者是负责创建和发送消息的实体。生产者可以...

Global site tag (gtag.js) - Google Analytics