<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://cxf.apache.org/core"
xmlns:p="http://cxf.apache.org/policy" xmlns:ss="http://www.springframework.org/schema/security"
xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://cxf.apache.org/policy http://cxf.apache.org/schemas/policy.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
http://cxf.apache.org/bindings/soap http://cxf.apache.org/schemas/configuration/soap.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/security http://www.springframework.org/schema/security/spring-security.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"
default-lazy-init="false">
<context:annotation-config />
<!-- MQ队列连接工厂配置 -->
<bean id="connectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="queueManager" value="TB_QM" />
<property name="hostName" value="192.168.10.30" />
<property name="port" value="1430" />
<property name="channel" value="CHANNEL_TB" />
<property name="transportType" value="1" />
</bean>
<!-- 队列连接工厂适配器 -->
<bean id="connectionFactoryAdapter"
class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="connectionFactory" />
</bean>
<!-- JmsTemplate模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactoryAdapter" />
</bean>
<!-- 请求队列(发送消息的队列) -->
<bean id="requestQueue" class="com.ibm.mq.jms.MQQueue">
<property name="baseQueueName" value="MBF_INPUTQ_MQINOUT" />
</bean>
<!-- JmsTemplateInvoke -->
<bean id="jmsTemplateInvoke" class="com.iteye.jms.send.JmsTemplateInvoke">
<property name="jmsTemplate" ref="jmsTemplate" />
<property name="requestQueue" ref="requestQueue" />
</bean>
</beans>
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class JmsTemplateInvoke {
private static final Logger logger = LoggerFactory.getLogger(JmsTemplateInvoke.class);
/**
* JmsTemplate模板
*/
private JmsTemplate jmsTemplate;
/**
* 请求队列
*/
private Queue requestQueue;
/**
* 响应队列
*/
private Queue responseQueue;
/**
* 响应消息接收超时时间
*/
private long receiveTimeout;
/**
*
* MQ异步接口调用 <br>
* 〈功能详细描述〉
*
* @param requestMessage
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
public void asynRequestProcessor(String requestMessage) {
Map attributes = new HashMap();
attributes.put(MQConstants.JMS_CORRELATION_ID, RequestIDGenerator.generateMessageRequestID());
sendToProcessQueue(requestMessage, attributes);
}
/**
*
* MQ同步接口调用 <br>
* 〈功能详细描述〉
*
* @param requestMessage
* @return
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
public String syncRequestProcessor(String requestMessage) {
Map attributes = new HashMap();
String jmsCorrelationID = RequestIDGenerator.generateMessageRequestID();
attributes.put(MQConstants.JMS_CORRELATION_ID, jmsCorrelationID);
// 向队列中发送消息
sendToProcessQueue(requestMessage, attributes);
// 根据jmsCorrelationID来接收同步返回的消息
String messageText = reciveFromReceiptQueue(jmsCorrelationID);
return messageText;
}
/**
*
* 向队列中发送消息 <br>
* 〈功能详细描述〉
*
* @param msgText
* @param attributes
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
public void sendToProcessQueue(final String msgText, Map attributes) {
final String correlationID = (String) attributes.get(MQConstants.JMS_CORRELATION_ID);
final Destination replyTo = (Destination) attributes.get(MQConstants.JMS_REPLY_TO);
final String type = (String) attributes.get(MQConstants.JMS_TYPE);
jmsTemplate.send(requestQueue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
BytesMessage bm = session.createBytesMessage();
bm.setIntProperty(MQConstants.JMS_IBM_CHARACTER_SET, 1208);
try {
bm.writeBytes(msgText.getBytes("utf-8"));
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage(), e);
throw new JMSException(e.getMessage());
}
bm.setJMSCorrelationID(correlationID);
bm.setJMSReplyTo(replyTo);
bm.setJMSType(type);
return bm;
}
});
logger.debug(
"[MessageProcessor:sendToProcessQueue()]: [jmsCorrelationID={}]: [message={}] send to queue is successful!",
correlationID, msgText);
}
/**
*
* 根据jmsCorrelationID来接收同步返回的消息 <br>
* 〈功能详细描述〉
*
* @param jmsCorrelationID
* @return
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
private String reciveFromReceiptQueue(String jmsCorrelationID) {
// 设置接收等待的超时时间
BytesMessage msg = null;
String msgText = null;
ByteArrayOutputStream byteStream = null;
try {
jmsTemplate.setReceiveTimeout(receiveTimeout);
// 设置根据JMSCorrelationID过滤需要接收的消息
String selector = "JMSCorrelationID = 'ID:" + byte2HexStr(jmsCorrelationID.getBytes()) + "'";
// 接收消息
msg = (BytesMessage) jmsTemplate.receiveSelected(responseQueue, selector);//
if (msg == null) {
throw new RuntimeException("socket timeout!" + "jmsCorrelationID=" + jmsCorrelationID);
}
byteStream = new ByteArrayOutputStream();
byte[] buffer = new byte[(int) msg.getBodyLength()];
msg.readBytes(buffer);
byteStream.write(buffer, 0, (int) msg.getBodyLength());
msgText = new String(byteStream.toByteArray());
logger.debug(
"[MessageProcessor:reciveFromReceiptQueue()]: [jmsCorrelationID={}]: [response={}] receive from queue is successful!",
jmsCorrelationID, msgText);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (byteStream != null) {
byteStream.close();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return msgText;
}
public static String byte2HexStr(byte[] b) {
String hs = "";
String stmp = "";
for (int n = 0; n < b.length; n++) {
stmp = (Integer.toHexString(b[n] & 0XFF));
if (stmp.length() == 1)
hs = hs + "0" + stmp;
else
hs = hs + stmp;
}
return hs.toUpperCase();
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Queue getRequestQueue() {
return requestQueue;
}
public void setRequestQueue(Queue requestQueue) {
this.requestQueue = requestQueue;
}
public long getReceiveTimeout() {
return receiveTimeout;
}
public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public Queue getResponseQueue() {
return responseQueue;
}
public void setResponseQueue(Queue responseQueue) {
this.responseQueue = responseQueue;
}
}
public class MQConstants {
public static final String JMS_CORRELATION_ID = "jmsCorrelationID";
public static final String JMS_REPLY_TO = "jmsReplyTo";
public static final String JMS_TYPE = "jmsType";
public static final String JMS_IBM_CHARACTER_SET = "JMS_IBM_Character_Set";
}
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class RequestIDGenerator {
public static String generateMessageRequestID() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
String id = simpleDateFormat.format(new Date());
String numberChar = "0123456789";
StringBuffer sb = new StringBuffer();
Random random = new Random();
for (int i = 0; i < 4; i++) {
sb.append(numberChar.charAt(random.nextInt(numberChar.length())));
}
return id + sb.toString();
}
}
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
import com.ibm.mq.jms.MQQueue;
/**
* 〈一句话功能简述〉<br>
* 〈功能详细描述〉
*
* @see [相关类/方法](可选)
* @since [产品/模块版本] (可选)
*/
@ContextConfiguration(locations = { "file:src/main/java/com/iteye/jms/send/bean.xml" })
public class JmsTemplateInvokeTest extends AbstractTestNGSpringContextTests {
@Autowired
@Qualifier("jmsTemplate")
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("requestQueue")
private MQQueue requestQueue;
@Autowired
@Qualifier("jmsTemplateInvoke")
private JmsTemplateInvoke jmsTemplateInvoke;
/**
*
* 发送MQ消息 <br>
* 〈功能详细描述〉
*
* @throws Exception
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
// @Test
public void testSendMqMsg1() throws Exception {
String requestMessage = getEsbXml();
JmsTemplateInvoke jmsTemplateInvoke = new JmsTemplateInvoke();
jmsTemplateInvoke.setJmsTemplate(jmsTemplate);
jmsTemplateInvoke.setRequestQueue(requestQueue);
jmsTemplateInvoke.asynRequestProcessor(requestMessage);
}
/**
*
* 发送MQ消息 <br>
* 〈功能详细描述〉
*
* @throws Exception
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
@Test
public void testSendMqMsg2() throws Exception {
String requestMessage = getEsbXml();
jmsTemplateInvoke.asynRequestProcessor(requestMessage);
}
/**
*
* 获取请求报文 <br>
* 〈功能详细描述〉
*
* @return
* @throws Exception
* @see [相关类/方法](可选)
* @since [产品/模块版本](可选)
*/
private String getEsbXml() throws Exception {
StringBuilder esbXml = new StringBuilder(10000);
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(
"D:/CReviewMgmt-syncOrderCommentInfoNew_in_zr.xml"), "UTF-8"));
String line = null;
while ((line = br.readLine()) != null) {
esbXml.append(line);
}
br.close();
return esbXml.toString();
}
}
分享到:
相关推荐
在这个基于Spring的消息队列系统中,前端Web应用通过并发发送Ajax请求来获取数据,后端Action将这些数据放入缓冲队列中,然后由单线程消费者来消费队列中的数据并进行持久化存储。下面我们将深入探讨这个系统的关键...
创建一个Spring Boot的Bean,该Bean使用`JmsTemplate`来发送消息到指定的ActiveMQ主题或队列: ```java @Autowired private JmsTemplate jmsTemplate; public void sendMessage(String message, String ...
通过分析这些代码,你可以了解到如何在实际项目中使用AMQ,如何设置消息头,以及如何处理消息确认和异常。此外,你还可以学习到如何测试和调试AMQ应用程序,以确保消息正确地发送和接收。 总结起来,本示例旨在教你...
本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...
如果项目中使用了Spring框架,可以通过Spring的JMS支持简化与MQ的交互。Spring提供了一个抽象层,可以简化连接管理、事务处理和异常处理。配置MQ连接工厂和目的地后,只需编写一个监听器或模板方法即可发送消息。...
在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的中间件,它在分布式系统中扮演着数据通信的关键角色。ActiveMQ是Apache软件基金会开发的一款开源MQ产品,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等...
在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。在本教程中,我们将专注于如何在SpringBoot框架下快速集成并使用ActiveMQ,一个广泛使用的开源消息...
2. **创建消息生产者**:在Spring中,你可以使用`JmsTemplate`作为消息生产者,发送消息到ActiveMQ的队列或主题。配置`JmsTemplate`并设置ActiveMQ的连接工厂,然后在需要发送消息的地方调用其`convertAndSend`方法...
在P2P模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者接收。发布/订阅模型中,消息发布到一个主题,多个订阅者可以同时接收。 二、Spring与JMS集成 2.1 Spring JMS模块 Spring框架的JMS模块提供...
Spring的JMS支持使得在应用中使用消息传递变得简单且灵活。通过配置`ConnectionFactory`、`JmsTemplate`和消息监听器,我们可以方便地实现消息的发送和接收,从而提高系统的可扩展性和解耦性。同时,Spring提供的...
2. 在 Spring 配置文件中定义 JmsTemplate 和消息目的地(队列或主题)。 3. 使用 JmsTemplate 的 send 方法发送消息,指定目的地和消息内容。 4. 对于消息接收,可以使用 `MessageListener` 接口实现消息监听器,...
3. **消息生产者**:在源码中,一个Spring bean通常被配置为消息生产者,它使用`JMSTemplate`向ActiveMQ消息队列发布消息。生产者可以是服务或控制器,它们在执行业务逻辑时将消息放入队列。 4. **消息消费者**:另...
3. **创建邮件生产者**:创建一个bean,使用`JmsTemplate`发送包含邮件信息的消息到队列。 4. **定义邮件消费者**:创建一个监听邮件队列的bean,该bean接收到消息后,调用`JavaMailSender`发送邮件。 5. **异常...
在Spring JMS中,使用`JmsTemplate.convertAndSend()`方法可以发送TextMessage,而`MessageListener`接口的`onMessage(Message msg)`方法则用于接收。 2. **MapMessage**: MapMessage允许你以键值对的形式存储和...
在"ActiveMQ,Spring的简单例子"中,我们将探讨如何将Spring与ActiveMQ集成,利用`JMSTemplate`来发送和接收消息。 首先,我们需要在项目中引入ActiveMQ的相关依赖。这通常通过Maven或Gradle的配置完成,添加...
在本文中,我们将探讨两种使用Java实现消息队列的方式,分别是利用Spring消息模板发送消息和使用Apache ActiveMQ官方实例。 1. Spring消息模板发送消息 Spring框架提供了对JMS的强大支持,其中包括一个名为`...
广播是一种消息发布机制,其中一个或多个消息生产者向消息队列发送消息,而多个消费者可以订阅这些消息。这种方式非常适合用于实时通知系统,如新闻推送、系统状态更新等场景。 ##### 2.4 错峰与流控 在高峰期,...
5. **发送和接收消息**:使用创建的发送者对象将消息发布到队列,接收者则从队列中消费消息。消息可以是文本、对象或二进制数据。 6. **处理消息**:接收到消息后,消费者可以对其进行处理,例如处理信用请求或执行...