`

使用Jmstemplate向队列中发送数据

阅读更多
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://cxf.apache.org/core"
	xmlns:p="http://cxf.apache.org/policy" xmlns:ss="http://www.springframework.org/schema/security"
	xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   	http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
   	http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
   	http://cxf.apache.org/policy http://cxf.apache.org/schemas/policy.xsd
   	http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
   	http://cxf.apache.org/bindings/soap http://cxf.apache.org/schemas/configuration/soap.xsd
   	http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context.xsd
   	http://www.springframework.org/schema/security http://www.springframework.org/schema/security/spring-security.xsd
   	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
   	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"
	default-lazy-init="false">
	<context:annotation-config />

	
	<!-- MQ队列连接工厂配置 	-->
	<bean id="connectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> 
		<property name="queueManager" value="TB_QM" />
		<property name="hostName" value="192.168.10.30" /> 
		<property name="port" value="1430" /> 
		<property name="channel" value="CHANNEL_TB" />
		<property name="transportType" value="1" />
	</bean> 
	
	<!-- 队列连接工厂适配器 -->
	<bean id="connectionFactoryAdapter"
		class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
		<property name="targetConnectionFactory" ref="connectionFactory" />
	</bean>
	
	<!-- JmsTemplate模板 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
		<property name="connectionFactory" ref="connectionFactoryAdapter" />
	</bean>
	
	<!-- 请求队列(发送消息的队列) -->
	<bean id="requestQueue" class="com.ibm.mq.jms.MQQueue"> 
		<property name="baseQueueName" value="MBF_INPUTQ_MQINOUT" />
	</bean>
	 
	 <!-- JmsTemplateInvoke -->
	 <bean id="jmsTemplateInvoke" class="com.iteye.jms.send.JmsTemplateInvoke"> 
		<property name="jmsTemplate" ref="jmsTemplate" />
		<property name="requestQueue" ref="requestQueue" />
	</bean>
</beans>


import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

@SuppressWarnings({ "rawtypes", "unchecked" })
public class JmsTemplateInvoke {
    private static final Logger logger = LoggerFactory.getLogger(JmsTemplateInvoke.class);

    /**
     * JmsTemplate模板
     */
    private JmsTemplate jmsTemplate;

    /**
     * 请求队列
     */
    private Queue requestQueue;

    /**
     * 响应队列
     */
    private Queue responseQueue;

    /**
     * 响应消息接收超时时间
     */
    private long receiveTimeout;

    /**
     * 
     * MQ异步接口调用 <br>
     * 〈功能详细描述〉
     * 
     * @param requestMessage
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    public void asynRequestProcessor(String requestMessage) {
        Map attributes = new HashMap();
        attributes.put(MQConstants.JMS_CORRELATION_ID, RequestIDGenerator.generateMessageRequestID());

        sendToProcessQueue(requestMessage, attributes);
    }

    /**
     * 
     * MQ同步接口调用 <br>
     * 〈功能详细描述〉
     * 
     * @param requestMessage
     * @return
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    public String syncRequestProcessor(String requestMessage) {
        Map attributes = new HashMap();
        String jmsCorrelationID = RequestIDGenerator.generateMessageRequestID();
        attributes.put(MQConstants.JMS_CORRELATION_ID, jmsCorrelationID);

        // 向队列中发送消息
        sendToProcessQueue(requestMessage, attributes);

        // 根据jmsCorrelationID来接收同步返回的消息
        String messageText = reciveFromReceiptQueue(jmsCorrelationID);

        return messageText;
    }

    /**
     * 
     * 向队列中发送消息 <br>
     * 〈功能详细描述〉
     * 
     * @param msgText
     * @param attributes
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    public void sendToProcessQueue(final String msgText, Map attributes) {

        final String correlationID = (String) attributes.get(MQConstants.JMS_CORRELATION_ID);
        final Destination replyTo = (Destination) attributes.get(MQConstants.JMS_REPLY_TO);
        final String type = (String) attributes.get(MQConstants.JMS_TYPE);
        jmsTemplate.send(requestQueue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                BytesMessage bm = session.createBytesMessage();

                bm.setIntProperty(MQConstants.JMS_IBM_CHARACTER_SET, 1208);

                try {
                    bm.writeBytes(msgText.getBytes("utf-8"));
                } catch (UnsupportedEncodingException e) {
                    logger.error(e.getMessage(), e);
                    throw new JMSException(e.getMessage());
                }
                bm.setJMSCorrelationID(correlationID);
                bm.setJMSReplyTo(replyTo);
                bm.setJMSType(type);
                return bm;
            }
        });
        logger.debug(
                "[MessageProcessor:sendToProcessQueue()]: [jmsCorrelationID={}]: [message={}] send to queue is successful!",
                correlationID, msgText);

    }

    /**
     * 
     * 根据jmsCorrelationID来接收同步返回的消息 <br>
     * 〈功能详细描述〉
     * 
     * @param jmsCorrelationID
     * @return
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    private String reciveFromReceiptQueue(String jmsCorrelationID) {
        // 设置接收等待的超时时间
        BytesMessage msg = null;
        String msgText = null;
        ByteArrayOutputStream byteStream = null;
        try {
            jmsTemplate.setReceiveTimeout(receiveTimeout);
            // 设置根据JMSCorrelationID过滤需要接收的消息
            String selector = "JMSCorrelationID = 'ID:" + byte2HexStr(jmsCorrelationID.getBytes()) + "'";
            // 接收消息
            msg = (BytesMessage) jmsTemplate.receiveSelected(responseQueue, selector);//
            if (msg == null) {
                throw new RuntimeException("socket timeout!" + "jmsCorrelationID=" + jmsCorrelationID);
            }
            byteStream = new ByteArrayOutputStream();
            byte[] buffer = new byte[(int) msg.getBodyLength()];
            msg.readBytes(buffer);
            byteStream.write(buffer, 0, (int) msg.getBodyLength());
            msgText = new String(byteStream.toByteArray());
            logger.debug(
                    "[MessageProcessor:reciveFromReceiptQueue()]: [jmsCorrelationID={}]: [response={}] receive from queue is successful!",
                    jmsCorrelationID, msgText);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            try {
                if (byteStream != null) {
                    byteStream.close();
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        return msgText;
    }

    public static String byte2HexStr(byte[] b) {
        String hs = "";
        String stmp = "";
        for (int n = 0; n < b.length; n++) {
            stmp = (Integer.toHexString(b[n] & 0XFF));
            if (stmp.length() == 1)
                hs = hs + "0" + stmp;
            else
                hs = hs + stmp;
        }
        return hs.toUpperCase();
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public Queue getRequestQueue() {
        return requestQueue;
    }

    public void setRequestQueue(Queue requestQueue) {
        this.requestQueue = requestQueue;
    }

    public long getReceiveTimeout() {
        return receiveTimeout;
    }

    public void setReceiveTimeout(long receiveTimeout) {
        this.receiveTimeout = receiveTimeout;
    }

    public Queue getResponseQueue() {
        return responseQueue;
    }

    public void setResponseQueue(Queue responseQueue) {
        this.responseQueue = responseQueue;
    }
}





public class MQConstants {

    public static final String JMS_CORRELATION_ID = "jmsCorrelationID";

    public static final String JMS_REPLY_TO = "jmsReplyTo";

    public static final String JMS_TYPE = "jmsType";

    public static final String JMS_IBM_CHARACTER_SET = "JMS_IBM_Character_Set";
}



import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class RequestIDGenerator {

    public static String generateMessageRequestID() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
        String id = simpleDateFormat.format(new Date());
        String numberChar = "0123456789";
        StringBuffer sb = new StringBuffer();
        Random random = new Random();
        for (int i = 0; i < 4; i++) {
            sb.append(numberChar.charAt(random.nextInt(numberChar.length())));
        }
        return id + sb.toString();
    }
}




import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

import com.ibm.mq.jms.MQQueue;

/**
 * 〈一句话功能简述〉<br>
 * 〈功能详细描述〉
 * 
 * @see [相关类/方法](可选)
 * @since [产品/模块版本] (可选)
 */
@ContextConfiguration(locations = { "file:src/main/java/com/iteye/jms/send/bean.xml" })
public class JmsTemplateInvokeTest extends AbstractTestNGSpringContextTests {

    @Autowired
    @Qualifier("jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("requestQueue")
    private MQQueue requestQueue;

    @Autowired
    @Qualifier("jmsTemplateInvoke")
    private JmsTemplateInvoke jmsTemplateInvoke;

    /**
     * 
     * 发送MQ消息 <br>
     * 〈功能详细描述〉
     * 
     * @throws Exception
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    // @Test
    public void testSendMqMsg1() throws Exception {
        String requestMessage = getEsbXml();
        JmsTemplateInvoke jmsTemplateInvoke = new JmsTemplateInvoke();
        jmsTemplateInvoke.setJmsTemplate(jmsTemplate);
        jmsTemplateInvoke.setRequestQueue(requestQueue);

        jmsTemplateInvoke.asynRequestProcessor(requestMessage);
    }

    /**
     * 
     * 发送MQ消息 <br>
     * 〈功能详细描述〉
     * 
     * @throws Exception
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    @Test
    public void testSendMqMsg2() throws Exception {
        String requestMessage = getEsbXml();

        jmsTemplateInvoke.asynRequestProcessor(requestMessage);
    }

    /**
     * 
     * 获取请求报文 <br>
     * 〈功能详细描述〉
     * 
     * @return
     * @throws Exception
     * @see [相关类/方法](可选)
     * @since [产品/模块版本](可选)
     */
    private String getEsbXml() throws Exception {
        StringBuilder esbXml = new StringBuilder(10000);

        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(
                "D:/CReviewMgmt-syncOrderCommentInfoNew_in_zr.xml"), "UTF-8"));

        String line = null;
        while ((line = br.readLine()) != null) {
            esbXml.append(line);
        }

        br.close();

        return esbXml.toString();
    }
}
分享到:
评论

相关推荐

    基于spring 消息队列

    在这个基于Spring的消息队列系统中,前端Web应用通过并发发送Ajax请求来获取数据,后端Action将这些数据放入缓冲队列中,然后由单线程消费者来消费队列中的数据并进行持久化存储。下面我们将深入探讨这个系统的关键...

    Springboot整合ActiveMQ,实现消息的发送接收功能源码

    创建一个Spring Boot的Bean,该Bean使用`JmsTemplate`来发送消息到指定的ActiveMQ主题或队列: ```java @Autowired private JmsTemplate jmsTemplate; public void sendMessage(String message, String ...

    AMQ 实现消息队列 基本DEMO

    通过分析这些代码,你可以了解到如何在实际项目中使用AMQ,如何设置消息头,以及如何处理消息确认和异常。此外,你还可以学习到如何测试和调试AMQ应用程序,以确保消息正确地发送和接收。 总结起来,本示例旨在教你...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    三种Java向MQ发送消息的方法 -- java代码

    如果项目中使用了Spring框架,可以通过Spring的JMS支持简化与MQ的交互。Spring提供了一个抽象层,可以简化连接管理、事务处理和异常处理。配置MQ连接工厂和目的地后,只需编写一个监听器或模板方法即可发送消息。...

    ActiveMQ消息队列主题订阅Spring整合

    在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的中间件,它在分布式系统中扮演着数据通信的关键角色。ActiveMQ是Apache软件基金会开发的一款开源MQ产品,支持多种协议,如OpenWire、STOMP、AMQP、MQTT等...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...

    SpringBoot快速玩转ActiveMQ消息队列

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。在本教程中,我们将专注于如何在SpringBoot框架下快速集成并使用ActiveMQ,一个广泛使用的开源消息...

    Spring+ActiveMQ消息队列+前台接收消息

    2. **创建消息生产者**:在Spring中,你可以使用`JmsTemplate`作为消息生产者,发送消息到ActiveMQ的队列或主题。配置`JmsTemplate`并设置ActiveMQ的连接工厂,然后在需要发送消息的地方调用其`convertAndSend`方法...

    Spring In Action 使用Spring发送和接收JMS消息

    在P2P模型中,消息从一个生产者发送到一个队列,然后由一个或多个消费者接收。发布/订阅模型中,消息发布到一个主题,多个订阅者可以同时接收。 二、Spring与JMS集成 2.1 Spring JMS模块 Spring框架的JMS模块提供...

    Spring发送接收JMS消息

    Spring的JMS支持使得在应用中使用消息传递变得简单且灵活。通过配置`ConnectionFactory`、`JmsTemplate`和消息监听器,我们可以方便地实现消息的发送和接收,从而提高系统的可扩展性和解耦性。同时,Spring提供的...

    使用Spring JMS轻松实现异步消息传递.pdf

    2. 在 Spring 配置文件中定义 JmsTemplate 和消息目的地(队列或主题)。 3. 使用 JmsTemplate 的 send 方法发送消息,指定目的地和消息内容。 4. 对于消息接收,可以使用 `MessageListener` 接口实现消息监听器,...

    Spring和ActiveMQ的整合实例源码ActiveMQSpringDemo.zip

    3. **消息生产者**:在源码中,一个Spring bean通常被配置为消息生产者,它使用`JMSTemplate`向ActiveMQ消息队列发布消息。生产者可以是服务或控制器,它们在执行业务逻辑时将消息放入队列。 4. **消息消费者**:另...

    ActiveMQ与Spring整合之异步发送邮件

    3. **创建邮件生产者**:创建一个bean,使用`JmsTemplate`发送包含邮件信息的消息到队列。 4. **定义邮件消费者**:创建一个监听邮件队列的bean,该bean接收到消息后,调用`JavaMailSender`发送邮件。 5. **异常...

    ActiveMQ常见消息类型.docx

    在Spring JMS中,使用`JmsTemplate.convertAndSend()`方法可以发送TextMessage,而`MessageListener`接口的`onMessage(Message msg)`方法则用于接收。 2. **MapMessage**: MapMessage允许你以键值对的形式存储和...

    ActiveMQ,Spring的简单例子

    在"ActiveMQ,Spring的简单例子"中,我们将探讨如何将Spring与ActiveMQ集成,利用`JMSTemplate`来发送和接收消息。 首先,我们需要在项目中引入ActiveMQ的相关依赖。这通常通过Maven或Gradle的配置完成,添加...

    java实现消息队列的两种方式(小结)

    在本文中,我们将探讨两种使用Java实现消息队列的方式,分别是利用Spring消息模板发送消息和使用Apache ActiveMQ官方实例。 1. Spring消息模板发送消息 Spring框架提供了对JMS的强大支持,其中包括一个名为`...

    ActiveMq总结.docx

    广播是一种消息发布机制,其中一个或多个消息生产者向消息队列发送消息,而多个消费者可以订阅这些消息。这种方式非常适合用于实时通知系统,如新闻推送、系统状态更新等场景。 ##### 2.4 错峰与流控 在高峰期,...

    使用Spring JMS轻松实现异步消息传递.docx

    5. **发送和接收消息**:使用创建的发送者对象将消息发布到队列,接收者则从队列中消费消息。消息可以是文本、对象或二进制数据。 6. **处理消息**:接收到消息后,消费者可以对其进行处理,例如处理信用请求或执行...

Global site tag (gtag.js) - Google Analytics