`
kanpiaoxue
  • 浏览: 1782062 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

spring hornetq selector 过滤消息

 
阅读更多

在接收 JMS消息的时候,我们经常要在消息队列里面过滤出自己需要的消息,摒弃我们不需要的消息。这个时候就需要用到 JMS的selector功能。这里结合spring3.1,给出一个例子。

 

发送消息的配置:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

	xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">


	<bean id="selectorQueue" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createQueue">
		<constructor-arg value="org.spring.jms.selector.queue" />
	</bean>
	
	
	<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
		<constructor-arg
			value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
		<constructor-arg>
			<map key-type="java.lang.String" value-type="java.lang.Object">
				<entry key="host" value="localhost"></entry>
				<entry key="port" value="5445"></entry>
			</map>
		</constructor-arg>
	</bean>

	<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createConnectionFactoryWithoutHA">
		<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
			value="CF" />
		<constructor-arg ref="transportConfiguration" />
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	
	<bean id="sender" class="com.wanmei.jms.spring.selector.config.sender.Sender">
		<property name="jmsTemplate" ref="jmsTemplate" />
		<property name="destination" ref="selectorQueue" />
	</bean>
		
</beans>

 

 发送消息的Sender类

 

/**
 * <pre>
 * </pre>
 */
package com.wanmei.jms.spring.selector.config.sender;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.wanmei.jms.spring.selector.java.State;

/**
 * <pre>
 * date 2012-12-20
 * </pre>
 */
public class Sender implements State {
	private static final Logger LOGGER = Logger.getLogger(Sender.class);
	private JmsTemplate jmsTemplate;
	private Destination destination;
	

	public void send(final String message, final String fromNode,
			final String toNode) {
		try {
			LOGGER.info("start to send message to " + destination
					+ " [message:" + message + ",fromNode:" + fromNode
					+ ",toNode:" + toNode);
			jmsTemplate.send(destination, new MessageCreator() {

				@Override
				public Message createMessage(Session session)
						throws JMSException {
					LOGGER.info("session:"+session + "\nmessage : " + message);
					TextMessage msg = session.createTextMessage(message);
					msg.setStringProperty(FROM_NODE, fromNode);
					msg.setStringProperty(TO_NODE, toNode);
					
					LOGGER.info("-->"+msg);
					LOGGER.info(TO_NODE+"-->"+toNode);
					return msg;
				}
			});
			
			LOGGER.info("send message to " + destination + " successfully!");
		} catch (Throwable t) {
			LOGGER.error("Error:" + t.getMessage(), t);
		}
	}

	// ----------------- setter / getter
	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public Destination getDestination() {
		return destination;
	}

	public void setDestination(Destination destination) {
		this.destination = destination;
	}

	public static Logger getLogger() {
		return LOGGER;
	}
}
 

 

发送消息的main函数类

 

/**
 * <pre>
 * </pre>
 */
package com.wanmei.jms.spring.selector.config.sender;

import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

/**
 * <pre>
 * date 2012-12-20
 * </pre>
 */
public class BootstrapSender {
	private static final Logger LOGGER = Logger
			.getLogger(BootstrapSender.class);

	/**
	 * <pre>
	 * @param args
	 * </pre>
	 */
	public static void main(String[] args) {
		LOGGER.info("start to work and initialize spring frame.");
		String configLocation = "E:/workspace_java/hornetq/src/com/wanmei/jms/spring/selector/config/sender/applicationContext.xml";
		ApplicationContext applicationContext = new FileSystemXmlApplicationContext(
				configLocation);
		LOGGER.info("initialize spring frame successfully!");
		Sender sender = applicationContext.getBean("sender", Sender.class);
		for (int i = 0; i < 10; i++) {
			Msg msg = createMessage(i);
			sender.send(msg.getMessage() + "-" + i, msg.getFromNode(), msg.getToNode());
		}
	}

	private static Msg createMessage(int i) {
		String base1 = "127.0.0.1";
		String base2 = "127.0.0.2";
		String message = message(base1);
		String fromNode = from(base1);
		String toNode = to(base2);
		if (i % 2 == 0) {
			message = message(base2);
			fromNode = from(base2);
			toNode = to(base1);
		}
		return new Msg(message, fromNode, toNode);
	}

	private static String message(String str) {
		return "send " + str;
	}

	private static String from(String str) {
		return "from " + str;
	}

	private static String to(String str) {
		return "to " + str;
	}

}

class Msg {
	private String message;
	private String fromNode;
	private String toNode;

	/**
	 * <pre>
	 * </pre>
	 */
	public Msg() {
		super();
	}

	/**
	 * <pre>
	 * @param message
	 * @param fromNode
	 * @param toNode
	 * </pre>
	 */
	public Msg(String message, String fromNode, String toNode) {
		super();
		this.message = message;
		this.fromNode = fromNode;
		this.toNode = toNode;
	}

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}

	public String getFromNode() {
		return fromNode;
	}

	public void setFromNode(String fromNode) {
		this.fromNode = fromNode;
	}

	public String getToNode() {
		return toNode;
	}

	public void setToNode(String toNode) {
		this.toNode = toNode;
	}

}

 

 用到的一个接口类:

 

/**
 * <pre>
 * </pre>
 */
package com.wanmei.jms.spring.selector.java;

/**
 * <pre>
 * date 2012-12-20
 * </pre>
 */
public interface State {
	public final static String FROM_NODE = "FROM_NODE";
	public final static String TO_NODE = "TO_NODE";
}

 

 

------------------------------------------------------ 开始接收消息-------------------------------------

接收的spring的配置文件:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

	xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

	<bean id="selectorQueue" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createQueue">
		<constructor-arg value="org.spring.jms.selector.queue" />
	</bean>
	
	<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
		<constructor-arg
			value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
		<constructor-arg>
			<map key-type="java.lang.String" value-type="java.lang.Object">
				<entry key="host" value="localhost"></entry>
				<entry key="port" value="5445"></entry>
			</map>
		</constructor-arg>
	</bean>

	<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createConnectionFactoryWithoutHA">
		<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
			value="CF" />
		<constructor-arg ref="transportConfiguration" />
	</bean>
	
	<bean id="receiveListener" class="com.wanmei.jms.spring.selector.java.ReceiveListener"/>
	
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory"/>
	    <property name="destination" ref="selectorQueue"/>
	    <property name="messageListener" ref="receiveListener"/>
	    <property name="messageSelector" value="TO_NODE='to 127.0.0.1'"/>
	</bean>
</beans>

 

 实现了MessageListener接口的类:

 

/**
 * <pre>
 * </pre>
 */
package com.wanmei.jms.spring.selector.java;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;

/**
 * <pre>
 * date 2012-12-20
 * </pre>
 */
public class ReceiveListener implements MessageListener, State {
	private static final Logger LOGGER = Logger
			.getLogger(ReceiveListener.class);

	/*
	 * (non-Javadoc)
	 * 
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(Message msg) {
		try {
			// LOGGER.info("start to receive from " + msg.getJMSDestination());
			TextMessage message = (TextMessage) msg;
			String fromNode = message.getStringProperty(FROM_NODE);
			String toNode = message.getStringProperty(TO_NODE);
			LOGGER.info("receive message from " + message.getJMSDestination()
					+ ", msg : " + message.getText() + ", fromNode : " + fromNode
					+ ", toNode : " + toNode);
		} catch (JMSException e) {
			LOGGER.error("Error" + e.getMessage(), e);
		}
	}

}

 

 含有main函数的引导类:

 

/**
 * <pre>
 * </pre>
 */
package com.wanmei.jms.spring.selector;

import org.apache.log4j.Logger;
import org.springframework.context.support.FileSystemXmlApplicationContext;

/**
 * <pre>
 * date 2012-12-20
 * </pre>
 */
public class Bootstrap {
	private static final Logger LOGGER = Logger
			.getLogger(Bootstrap.class);
	/**
	 *<pre>
	 * @param args
	 *</pre>
	 */
	public static void main(String[] args) {
		LOGGER.info("start to work and initialize spring frame.");
		String configLocation = "E:/workspace_tmp_copy/hornetq/src/com/wanmei/jms/spring/selector/config/receiver/c1/applicationContext.xml";
		new FileSystemXmlApplicationContext(configLocation);
		LOGGER.info("initialize spring frame successfully!");
		LOGGER.info("spring : " + configLocation);
	}

}

 

------------ 备注:

上面接收的spring的配置文件还可以采取另一种 JMS Namespace Support 来进行配置,注意 destination="org.spring.jms.selector.queue" 这里是queue的名称,不是引用

配置文件如下:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

	xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

	
	<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
		<constructor-arg
			value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
		<constructor-arg>
			<map key-type="java.lang.String" value-type="java.lang.Object">
				<entry key="host" value="localhost"></entry>
				<entry key="port" value="5445"></entry>
			</map>
		</constructor-arg>
	</bean>

	<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createConnectionFactoryWithoutHA">
		<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
			value="CF" />
		<constructor-arg ref="transportConfiguration" />
	</bean>

	
	<bean id="receiveListener" class="com.wanmei.jms.spring.selector.java.ReceiveListener"/>

	<jms:listener-container connection-factory="connectionFactory"
                        concurrency="10">
	    <jms:listener destination="org.spring.jms.selector.queue" ref="receiveListener" selector="TO_NODE='to 127.0.0.1'"/>
	</jms:listener-container>
	
</beans>

 具体内容请参考spring.3.1的reference的文档

分享到:
评论

相关推荐

    HornetQ官方学习资料

    HornetQ是一款由JBoss开发并维护的消息中间件,它具备高度的可扩展性和灵活性,能够支持集群部署以及多种消息传递协议。HornetQ不仅完全支持JMS(Java Message Service)标准,还提供了自身定制的消息API,从而能够...

    Hornetq2.1中文手册

    HornetQ 2.1 用户手册是一份详尽的指南,主要针对那些对消息中间件感兴趣的开发者,尤其是对英文文档阅读有困难的用户。HornetQ 是 JBoss 社区开发的一个开源消息传递系统,它支持 Message Queuing (MQ) 协议,能够...

    hornetq-2.3.0.Final-bin.zip

    HornetQ是JBoss公司开发的一个开源的消息中间件(Message Broker),它提供高效、可扩展、高可用性的消息传递服务。在hornetq-2.3.0.Final-bin.zip这个压缩包中,包含了HornetQ 2.3.0 Final版本的所有组件和必要的...

    HornetQ 2.1 中文文档

    HornetQ是JBoss社区所研发的开放源代码消息中间件;HornetQ是以Java 5 编写,因此只需要操作系统支持Java虚拟机,HornetQ便可运行。 支持Java消息服务 (JMS) 1.1 版本 集群 (Clustering) 支持庞大的消息(Message)...

    hornetq 实例

    HornetQ是一款开源的消息中间件,它在Java消息服务(JMS)规范下提供高效、可伸缩、高可用性的消息传递功能。HornetQ的设计目标是为各种规模的应用提供灵活、高性能且易于使用的消息传递解决方案。下面将详细探讨...

    HornetQ2.1中文手册

    HornetQ2.1中文手册中详细介绍了消息处理系统的基本概念和技术细节,这对于理解和使用HornetQ至关重要。 ##### 4.1 消息相关的概念 - **消息**:在HornetQ中,消息是指在应用程序之间传递的信息单元。 - **地址...

    hermes 监听hornetq JMS配置步奏

    在IT行业中,消息传递系统是分布式应用程序之间进行通信的关键组件,而HornetQ和Hermes都是此类系统的重要组成部分。HornetQ是一个高性能、轻量级且完全开源的消息中间件,它提供了JMS(Java消息服务)接口,允许...

    ActiveMQ和HornetQ性能对比

    本文旨在通过一系列测试数据对比分析ActiveMQ与HornetQ在不同消息大小及数量下的性能表现。测试环境为相同的硬件配置,确保了测试结果的公正性。通过对比两者的发送时间、吞吐量等指标,可以更直观地了解两者之间的...

    HornetQ2.3 API 文档

    HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。 这个是官方的API 里面有个2.1的中文文档

    hornetq 2.4.0免安装

    HornetQ 2.4.0 是一款轻量级且高效的开源消息中间件(Message Queuing,简称MQ),它提供了可靠的消息传递服务,适用于分布式系统中的数据通信。这款MQ解决方案设计目标是高吞吐量、低延迟以及可扩展性,使得在大...

    .net 连接HornetQ,需要的dll

    .NET 连接HornetQ是一项关键的技术任务,HornetQ是一款开源的消息中间件,它提供了高效、可扩展和高可用性的消息传递服务。在.NET环境中与HornetQ进行交互,通常需要借助特定的客户端库,如Apache.NMS.Stomp。下面将...

    HornetQ2.1中文手册.7z

    HornetQ是JBoss公司开发的一个开源消息中间件,它在Java消息服务(JMS)规范的基础上提供了高效、可扩展且高度可靠的异步通信功能。这个“HornetQ 2.1中文手册”是一个压缩包文件,包含了对HornetQ 2.1版本的详细...

    hornetq-journal-2.3.19.Final.zip

    【标题】"hornetq-journal-2.3.19.Final.zip" 提供的是HornetQ消息中间件的一个组件——Journal模块的特定版本。HornetQ是JBoss社区开发的一个高性能、可扩展且功能丰富的开源消息传递系统,它被广泛用于企业级应用...

    hornetq-2.4.0.Final-bin.tar

    hornetq安装包, hornetq-2.4.0.Final-bin.tar 消息中间件 供项目中数据交互使用

    HornetQ2.0.0GA

    6. 容易集成:HornetQ 可以轻松集成到基于Java的任何应用程序中,包括但不限于Spring框架,以及JBoss应用服务器,因为HornetQ 原生是JBoss AS的一部分。 7. 集成开发环境:HornetQ 提供了强大的管理控制台和API,...

    HornetQ集群配置

    HornetQ是一款高性能、可伸缩且开源的消息中间件,它被广泛用于构建分布式系统中的消息传递。在HornetQ中,集群配置是一种重要的特性,它允许多个HornetQ服务器形成一个集群,共享资源,提高可用性和可扩展性。本篇...

    HornetQ Messaging Developer's Guide.pdf

    HornetQ是java开源实现的消息系统框架,性能上比ActiveQ要好一些,被集成到JBoss的消息服务中。 Table of Contents Preface 1 Chapter 1: Getting Started with HornetQ 9 Chapter 2: Setting Up HornetQ 31 ...

    HornetQ 2_1用户手册

    HornetQ提供自动客户端失效备援(automatic client failover)功能,能保证在服务器故障时没有消息丢失或消息重复。 * 超级灵活的集群方案。可以控制集群进行消息负载均衡的方式。分布在不同地理位置的各个集群间...

    hornetq-2.2.5.Final.zip

    hornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.zip

    hornetq-transports-2.0.0.GA.jar

    hornetq-transports-2.0.0.GA.jar

Global site tag (gtag.js) - Google Analytics