`
suhuanzheng7784877
  • 浏览: 704647 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
Ff8d036b-05a9-33b5-828a-2633bb68b7e6
读金庸故事,品程序人生
浏览量:47756
社区版块
存档分类
最新评论

将Sun的Open Message Queue与Spring集成

阅读更多

1.       前言

基于JMS标准的消息中间件实现的产品有很多,JBossMQActiveMQOpenMQOpenJMS等等,最常用的还是apacheActiveMQ。有时也使用SunOpenMQ。在官网http://mq.java.net/处可以下载。Open Message QueueSun Java System Message Queue的一个开源版本。Open message queue是一个企业级,可升级,非常成熟的消息服务器。它为面向消息的系统集成提供一套完整的JMSJava Message Service )实现。由于Open MQ源自SunJava Message Queue,所以其具有Java System Message Queue拥有的所有特性,功能和性能。

2.       环境配置

下载后将相关的jar拷贝到项目的classpath下面。笔者在此为了安全起见,引入了很多jar包,将语言包都引入了。各位读者可以因地制宜。

以下是引入jar包的列表

lib/openmqjar/common-message.jar
lib/openmqjar/fscontext.jar
lib/openmqjar/grizzly.jar
lib/openmqjar/imq_de.jar
lib/openmqjar/imq_es.jar
lib/openmqjar/imq_fr.jar
lib/openmqjar/imq_it.jar
lib/openmqjar/imq_ja.jar
lib/openmqjar/imq_ko.jar
lib/openmqjar/imq_pt_BR.jar
lib/openmqjar/imq_zh_CN.jar
lib/openmqjar/imq_zh_TW.jar
lib/openmqjar/imq.jar
lib/openmqjar/imqadmin.jar
lib/openmqjar/imqbridgemgr.jar
lib/openmqjar/imqbroker.jar
lib/openmqjar/imqjmsbridge.jar
lib/openmqjar/imqjmsra.rar
lib/openmqjar/imqjmx_de.jar
lib/openmqjar/imqjmx_es.jar
lib/openmqjar/imqjmx_fr.jar
lib/openmqjar/imqjmx_it.jar
lib/openmqjar/imqjmx_ja.jar
lib/openmqjar/imqjmx_ko.jar
lib/openmqjar/imqjmx_pt_BR.jar
lib/openmqjar/imqjmx_zh_CN.jar
lib/openmqjar/imqjmx_zh_TW.jar
lib/openmqjar/imqjmx.jar
lib/openmqjar/imql10n_server_de.jar
lib/openmqjar/imql10n_server_es.jar
lib/openmqjar/imql10n_server_fr.jar
lib/openmqjar/imql10n_server_it.jar
lib/openmqjar/imql10n_server_ja.jar
lib/openmqjar/imql10n_server_ko.jar
lib/openmqjar/imql10n_server_pt_BR.jar
lib/openmqjar/imql10n_server_zh_CN.jar
lib/openmqjar/imql10n_server_zh_TW.jar
lib/openmqjar/imqservlet.jar
lib/openmqjar/imqstomp.jar
lib/openmqjar/imqutil.jar
lib/openmqjar/imqxm.jar
lib/openmqjar/jaxm-api.jar
lib/openmqjar/jhall.jar
lib/openmqjar/jms.jar
lib/openmqjar/jta.jar
lib/openmqjar/protobuf-2.3.0.jar

3.       之后项目加入Spring的相关jar包。

增加Spring配置文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd 
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-2.5.xsd"
	default-autowire="byName">

	<!--消息连接工厂-->
	<bean id="connectionfactoryfactory"
		class="message.listener.OpenMqConnectionFactory">
		<property name="properties">
			<props>
				<prop key="imqAddressList">127.0.0.1:7676</prop>
				<prop key="imqDefaultUsername">admin</prop>
				<prop key="imqDefaultPassword">admin</prop>
				<prop key="imqReconnectEnabled">true</prop>
				<prop key="imqReconnectAttempts">3</prop>
				<prop key="imqReconnectInterval">5000</prop>
				<prop key="imqAddressListBehavior">RANDOM</prop>
			</props>
		</property>
	</bean>

	<bean id="mqConnectionFactory" factory-bean="connectionfactoryfactory"
		factory-method="createConnectionFactory" />

	<!--设置广发消息目的-->
	<bean id="updateLocalRouteMap" class="com.sun.messaging.Topic">
		<constructor-arg type="java.lang.String" value="mytopic" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="mqConnectionFactory" />
		<property name="defaultDestination" ref="updateLocalRouteMap" />
		<property name="receiveTimeout" value="20000" />
	</bean>

	<!--消息监听器-->
	<bean id="messageListener1"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg>
			<bean
				class="message.listener.JMSMessageListener" />
		</constructor-arg>
		<property name="defaultListenerMethod" value="receive" />
		<property name="messageConverter">
			<null />
		</property>
	</bean>

	<!—实际的消息监消费者配置-->
	<bean id="consumercontainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="mqConnectionFactory" />
		<property name="destination" ref="updateLocalRouteMap" />
		<property name="messageListener" ref="messageListener1" />
		<property name="transactionTimeout" value="180000" />
		<property name="receiveTimeout" value="180000" />
		<property name="sessionTransacted" value="true" />
	</bean>
</beans>

4.       消息监听器

类代码如下

 

/**
 * JMS消息消费者。
 * 接收JMS消息后获得router想要的消息后,调用router接口更新本地缓存
 * @author liuyan
 * 
 */
public class JMSMessageListener implements MessageListener {

	private Logger log = Logger.getLogger(JMSMessageListener.class.getName());

	/**
	 * 接收JMS消息后的业务处理
	 */
	public void onMessage(Message message) {

		log.info("接收消息……");
		byte[] byteMessage = JMSByteConverterUtil
				.ConverterMessageToBttes(message);
		try {

			log.info("将转型成实体对象……");
			//……………………………………………………

			}

		} catch (InvalidProtocolBufferException e) {
			log.error("JMS异常" + e.getMessage());
			e.printStackTrace();
		} catch (Exception e) {
			log.error("其他异常" + e.getMessage());
			e.printStackTrace();
		}
	}
}

 因为一些原因此处就不给出完整代码了~~~反正是获取一个字节流后,转成对象,直接从对象中获取想要的信息。转成对象的辅助类如下

/**
 * 对获得的消息对象进行转型
 * @author liuyan
 */
public class JMSByteConverterUtil {

	private static Logger log = Logger.getLogger(JMSMessageListener.class
			.getName());
	
	/**
	 * 对获得的消息对象进行转型
	 * @param message
	 * @return
	 */
	public static byte[] ConverterMessageToBttes(Message message) {

		if (message == null) {
			log.error("消息对象为空……");
			return null;
		} else if (message instanceof BytesMessage) {

			log.debug("消息强制转型BytesMessage");
			BytesMessage bytesMessage = (BytesMessage) message;

			byte[] messageBytes;
			try {

				log.debug("建立空的消息二进制数组");
				messageBytes = new byte[(int) bytesMessage.getBodyLength()];

				log.debug("往二进制数组中写进二进制信息");
				bytesMessage.readBytes(messageBytes);

				log.debug("messageBytes.length=" + messageBytes.length);
				return messageBytes;

			} catch (JMSException e) {
				log.error("JMS错误:" + e.getMessage());
				e.printStackTrace();
				return null;
			}

		}else{
			
			log.error("消息对象不能正确转型");
			return null;
		}
		
	}
}

 5.       启动消息监听器

开启OpenMQ的服务,启动{OpenMQ_HOME}\mq\bin\下的imqcmd.exe命令

启动消息消费者很简单,代码如下

public class MessageConsumer {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
				new String[] { "classpath:/spring/applicationContext-openmq-jms.xml" });

		System.out.println(applicationContext.getId());

	}
}

 6.       消息发送者

启动消息消费者服务后,写一个测试类测试一下消息的,代码如下

public class MessageSender {

	/**
	 * @param args
	 * @throws JMSException
	 */
	public static void main(String[] args) throws JMSException {
		ConnectionFactory myConnFactory;
		myConnFactory = new com.sun.messaging.ConnectionFactory();
		myConnFactory.setProperty(ConnectionConfiguration.imqAddressList,
				"mq://127.0.0.1:7676");
		myConnFactory.setProperty(ConnectionConfiguration.imqReconnectEnabled,
				"true");
		Connection myConn = myConnFactory.createConnection();
		myConn.start();
		// Step 4:
		// Create a session within the connection.
		Session mySess = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic myTopic = new com.sun.messaging.Topic("testmq");// .Queue("testmq");
		MessageProducer myMsgProducer = mySess.createProducer(myTopic);

		ObjectMessage objectMessage = mySess.createObjectMessage();

		RouterMessageBean routerMessageBean = new RouterMessageBean();
		routerMessageBean.setDbName("mysql-test");
		routerMessageBean.setUserName("liuyan");
		routerMessageBean.setMaster(null);
		routerMessageBean.setSlave(null);
		
		objectMessage.setObject(routerMessageBean);
		
		BytesMessage bytesMessage = mySess.createBytesMessage();
		bytesMessage.writeUTF("the message is 消息内容!");
		

		myMsgProducer.send(bytesMessage);
		System.out.println("测试发送JMS消息");

		mySess.close();
		myConn.close();

	}
}

 

 

1
4
分享到:
评论

相关推荐

    C# MessageQueue示例

    下面将详细介绍C#中的MessageQueue以及如何使用它来发送和接收消息。 1. **MessageQueue基础** - **消息队列的概念**:消息队列是一种存储和转发消息的机制,它允许多个进程或应用程序之间通过消息进行通信,确保...

    活用Android的Message Queue

    - Handler对象可以与Looper配合,将消息放入Message Queue,或从Queue中取出消息进行处理。 - Handler对象可以跨线程传递,允许其他线程向指定线程发送消息。 - 消息队列中的消息只能由所属线程的对象处理,确保...

    Sun Java System Message Queue 安装指南,简单集群

    主要介绍linix下安装Sun Java System Message Queuey的注意事项,安装步骤,验证方法以及配置简单集群。

    Message,MessageQueue,Looper,Handler详解

    当调用`removeMessages()`方法时,将会把Message从MessageQueue中删除,并同时放入到MessagePool中以便后续重复使用。 ##### 3、Looper:消息队列的管理者 Looper是MessageQueue的管理者,每一个MessageQueue都不...

    Handler Looper MessageQueue 源码解析

    在Android系统中,Handler、Looper和MessageQueue是实现线程间通信的核心组件,它们共同构建了一个消息处理机制。本文将深入解析这三者的源码,帮助你理解它们的工作原理,并教你如何手写一套自己的Handler系统。 ...

    MessageQueue实战

    MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战MessageQueue实战

    sun-jms.rar_jms_message queue

    《JMS与Message Queue技术详解》 在Java世界中,JMS(Java Message Service)是一种标准,用于在分布式环境中提供消息传递服务。它允许应用程序创建、发送、接收和读取消息,有效地支持异步通信模式。JMS的核心概念...

    Android的消息处理机制--Message,MessageQueue

    本篇文章将详细探讨Android的消息处理机制,特别是Message和MessageQueue这两个核心概念。 #### 二、Android消息处理机制概述 当Android应用启动后,会创建一个主进程,在这个进程中包含了UI主线程。UI主线程负责...

    管理MessageQueue的Looper

    1. `Looper.prepare()`:这个方法主要用于初始化Looper,它会创建一个MessageQueue并与当前线程关联。在主线程中,通常不需要手动调用此方法,因为系统已经自动执行了。但在自定义的工作线程中,如果需要处理消息,...

    Handler+Looper+MessageQueue

    【Android 线程间通信:Handler、Looper 和 MessageQueue 深度解析】 在 Android 应用开发中,为了保证界面的流畅性,我们通常需要将耗时操作放在非 UI 线程中执行,然后通过某种机制将结果传递回 UI 线程进行界面...

    android MessageQueue

    在Android系统中,MessageQueue是消息机制的核心组件之一,它与Handler、Looper紧密协作,用于实现线程间的通信。理解MessageQueue的工作原理对于优化应用程序性能和处理异步操作至关重要。 MessageQueue是一个内部...

    实例MSMQ(MessageQueue)

    **MSMQ(Message Queue,消息队列)是微软提供的一种可靠的消息传递机制,它允许应用程序在不同的时间点发送和接收消息,即使发送方和接收方不在同一时间在线也能正常工作。在C#中,我们可以利用.NET框架提供的...

    Thread,Looper,Handler,Message,MessageQueue,MessagePool之间的关系

    子线程中的Handler通过Looper.prepare()和Looper.loop()建立消息循环,然后使用Message.obtain()创建Message,设置数据和目标Handler,最后通过Handler.sendMessage()将Message放入MessageQueue。主线程的Looper会...

    进程间通信之消息队列 ( message queue )——完整代码

    进程间通信之消息队列 ( message queue ) 消息队列是消息的链表,具有特定的格式,并由消息队列标识符标识. 七种进程间通信方式: 一.无名管道( pipe ) 二.有名管道( fifo ) 三.共享内存 ( shared memory ) 四....

    Message Queue 3.5.pdf

    #### 四、Message Queue的关键技术与特性 **1. 可靠消息传送** - **确认机制**:确保消息成功处理后才移除。 - **事务支持**:保证消息处理的一致性。 **2. 持久性存储** - **持久化**:将消息保存在磁盘上,防止...

    JMS Message Queue 实例

    是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,...消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。

    Message Queue(消息队列)介绍与应用

    ### Message Queue(消息队列)介绍与应用 #### 消息处理中的主要概念 消息队列作为一种关键的技术组件,在分布式系统中发挥着重要的作用。它主要用于处理和传递数据单元(即消息),这些消息可以在简单的文本字符串...

    基于Spring和Redis的全球化消息队列(MessageQueue)实现.zip

    基于Spring和Redis的全球化消息队列(MessageQueue)实现spring-redis-mq基于Spring和Redis的全球化消息队列(MessageQueue)使用方法创建项目由于这个库还没有提交到Maven的中央仓库,所以需要手动将其导入到你的...

Global site tag (gtag.js) - Google Analytics