`
kanpiaoxue
  • 浏览: 1789248 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Spring 集成 HornetQ Topic 应用

 
阅读更多

用HornetQ和Spring3做了一个简单的小例子,client发送指定的json串,经由HornetQ,由Server接收。

 

--------------------- client code -----------------------------------

spring 配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
        xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
        xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

        xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">


        <bean id="messageTopic" class="org.hornetq.api.jms.HornetQJMSClient"
                factory-method="createTopic">
                <constructor-arg value="com.wanmei.bitask.input.initialize.topic" />
        </bean>

        <bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg
                        value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
                <constructor-arg>
                        <map key-type="java.lang.String" value-type="java.lang.Object">
                                <entry key="host" value="192.168.123.74"></entry>
                                <entry key="port" value="5445"></entry>
                        </map>
                </constructor-arg>
        </bean>

        <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
                factory-method="createConnectionFactoryWithoutHA">
                <constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
                        value="CF" />
                <constructor-arg ref="transportConfiguration" />
        </bean>

        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
                <property name="connectionFactory" ref="connectionFactory" />
                <property name="pubSubDomain" value="true" />      
        </bean>

        <bean id="service" class="com.wanmei.service.impl.ServiceImpl">
                <property name="jmsTemplate" ref="jmsTemplate" />
                <property name="topic" ref="messageTopic" />
        </bean>


        
</beans>

 

java类的实现:

 

public class ServiceImpl implements Service {
	private static final Logger logger = Logger.getLogger(ServiceImpl.class);
	private JmsTemplate jmsTemplate;
	private Topic topic;

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.wanmei.service.Service#sendMessage(java.util.List)
	 */
	@Override
	public boolean sendMessage(List<TaskMessage> messageList) {
		return sendTopic(messageList);
	}

	// ------------------ private method

	private boolean sendTopic(List<TaskMessage> messageList) {
		try {
			for (TaskMessage msg : messageList) {
				Gson gson = new Gson();
				final String msgJson = gson.toJson(msg);
				logger.info("start to send topic to " + topic.getTopicName()
						+ ", message : " + msgJson);
				jmsTemplate.send(topic, new MessageCreator() {

					@Override
					public Message createMessage(Session session)
							throws JMSException {
						TextMessage message = session
								.createTextMessage(msgJson);
						return message;
					}
				});
			}
			return true;
		} catch (Exception e) {
			logger.error("Error: send topic failure:" + e.getMessage(), e);
			return false;
		}
	}

	// ------------------ setter / getter
	/**
	 * @return the jmsTemplate
	 */
	public JmsTemplate getJmsTemplate() {
		return jmsTemplate;
	}

	/**
	 * @param jmsTemplate
	 *            the jmsTemplate to set
	 */
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	/**
	 * @return the logger
	 */
	public static Logger getLogger() {
		return logger;
	}

	/**
	 * @return the topic
	 */
	public Topic getTopic() {
		return topic;
	}

	/**
	 * @param topic
	 *            the topic to set
	 */
	public void setTopic(Topic topic) {
		this.topic = topic;
	}

}

 

 

 

--------------------- server code -----------------------------------

spring 配置文件:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
	xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

	xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">


	<bean id="messageTopic" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createTopic">
		<constructor-arg value="com.wanmei.bitask.input.initialize.topic" />
	</bean>

	<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
		<constructor-arg
			value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
		<constructor-arg>
			<map key-type="java.lang.String" value-type="java.lang.Object">
				<entry key="host" value="192.168.123.74"></entry>
				<entry key="port" value="5445"></entry>
			</map>
		</constructor-arg>
	</bean>

	<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
		factory-method="createConnectionFactoryWithoutHA">
		<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
			value="CF" />
		<constructor-arg ref="transportConfiguration" />
	</bean>

	<!-- this is the Message Driven POJO (MDP) -->
	<bean id="messageListener" class="com.wanmei.service.MessageListenerImpl">
		<property name="jdbcTemplate" ref="bitaskJdbcTemplate" />
	</bean>

	<!-- and this is the message listener container -->
	<bean id="jmsContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="messageTopic" />
		<property name="messageListener" ref="messageListener" />
	</bean>


	<!-- jdbc start -->
	<bean id="bitaskDataSource" class="org.apache.commons.dbcp.BasicDataSource"
		destroy-method="close">
		<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
		<property name="url" value="jdbc:oracle:thin:@192.168.182.129:1521:wmdw" />
		<property name="username" value="bitask" />
		<property name="password" value="bitask" />
		<property name="validationQuery" value="select * from dual" />
	</bean>


	<bean id="bitaskJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
		<property name="dataSource" ref="bitaskDataSource" />
	</bean>

	<bean id="transactionManager_jdbcTemplate"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="bitaskDataSource" />
	</bean>

	<aop:config>
		<aop:advisor pointcut="execution(* com.wanmei.service.*Impl*.*(..))"
			advice-ref="txAdvice_jdbcTemplate" />
	</aop:config>

	<tx:advice id="txAdvice_jdbcTemplate" transaction-manager="transactionManager_jdbcTemplate">
		<tx:attributes>
			<tx:method name="change*" propagation="REQUIRED" />
			<tx:method name="update*" propagation="REQUIRED" />
			<tx:method name="merge*" propagation="REQUIRED" />
			<tx:method name="get*" read-only="true" />
			<tx:method name="save*" propagation="REQUIRED" />
			<tx:method name="add*" propagation="REQUIRED" />
			<tx:method name="delete*" propagation="REQUIRED" />
			<tx:method name="remove*" propagation="REQUIRED" />
			<tx:method name="hidden*" propagation="REQUIRED" />
		</tx:attributes>
	</tx:advice>

	<!-- <bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler" 
		lazy-init="true"/> -->

	<!-- jdbc end -->


</beans>
 

 

java类的实现:

 

public class MessageListenerImpl implements MessageListener {
	private static Logger logger = Logger.getLogger(MessageListenerImpl.class);
	private JdbcTemplate jdbcTemplate;

	/*
	 * (non-Javadoc)
	 * 
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(Message objMsg) {
		TextMessage msg = (TextMessage) objMsg;
		try {
			String json = msg.getText();
			logger.info("receive message : " + json);
			doJson(json);
		} catch (JMSException e) {
			logger.error(
					"Error: receive message from topic failure: "
							+ e.getMessage(), e);
		}
	}

	// ------------------ private method
	private void doJson(String json) {
		Gson gson = new Gson();
		TaskMessage message = gson.fromJson(json, TaskMessage.class);
		if (checkValid(message)) {
			updateProcRun(message);
		} else {
			logger.error("Error: found null value in message.");
		}

	}

	private void updateProcRun(final TaskMessage msg) {
		StringBuilder builder = new StringBuilder();
		builder.append(" update task_proc_run ")
				.append(" set task_status = ?, ").append(" reload = ?, ")
				.append(" ignore_type = ?, ").append(" initial_type = ?, ")
				.append(" last_modified = sysdate, ")
				.append(" exec_message = ? ")
				.append(" where proc_time = to_date(?,'yyyy-mm-dd') ")
				.append(" and proc_name = ? ")
				.append(" and data_source_name = ? ");
				

		int count = jdbcTemplate.update(builder.toString(),
				new PreparedStatementSetter() {

					@Override
					public void setValues(PreparedStatement ps)
							throws SQLException {
						ps.setInt(1, msg.getTaskStatus());
						ps.setString(2, msg.getReload());
						ps.setInt(3, msg.getIgnoreType());
						ps.setInt(4, msg.getInitialType());
						ps.setString(5, msg.getExecMessage());
						ps.setString(6, msg.getProcTime());
						ps.setString(7, msg.getProcName());
						ps.setString(8, msg.getDataSourceName());

					}
				});
		if (1 == count) {
			logger.info("update task_proc_run successfully. " + msg);
		} else {
			logger.error("update task_proc_run failure: " + msg);
		}

	}

	private static boolean checkValid(TaskMessage msg) {

		if (isNull(msg.getProcName()) || isNull(msg.getProcTime())
				|| isNull(msg.getDataSourceName())
				|| isNull(msg.getIgnoreType()) || isNull(msg.getInitialType())
				|| isNull(msg.getTaskStatus()) || isNull(msg.getExecMessage())
				|| isNull(msg.getReload())) {
			logger.error("Error: some field is null. Please check args.");
			return false;
		}

		return true;
	}

	private static boolean isNull(Object obj) {
		return obj == null ? true : false;
	}

	// ------------------ setter / getter
	/**
	 * @return the logger
	 */
	public static Logger getLogger() {
		return logger;
	}

	/**
	 * @param logger
	 *            the logger to set
	 */
	public static void setLogger(Logger logger) {
		MessageListenerImpl.logger = logger;
	}

	/**
	 * @return the jdbcTemplate
	 */
	public JdbcTemplate getJdbcTemplate() {
		return jdbcTemplate;
	}

	/**
	 * @param jdbcTemplate
	 *            the jdbcTemplate to set
	 */
	public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
		this.jdbcTemplate = jdbcTemplate;
	}

}
 
分享到:
评论

相关推荐

    HornetQ官方学习资料

    1. **企业应用集成**(EAI): 在大型企业环境中,HornetQ可以作为核心消息中间件,实现各个系统之间的消息传递和数据交换。 2. **分布式系统**: 对于分布式系统而言,HornetQ提供了一种可靠的消息传递机制,支持跨...

    HornetQ2.1中文手册

    - **5.3 将HORNETQ与JEE应用服务器集成**:说明了HornetQ与Java企业版(JEE)应用服务器集成的过程。 - **5.4 HORNETQ作为独立的服务(STANDALONE)**:解释了如何部署HornetQ作为一个独立的服务运行。 #### 三、使用...

    hornetq 实例

    HornetQ的设计目标是为各种规模的应用提供灵活、高性能且易于使用的消息传递解决方案。下面将详细探讨HornetQ的核心特性、配置、使用以及在"hornetqTest"实例中的应用。 1. **HornetQ核心特性** - **高性能**:...

    Hornetq2.1中文手册

    前言可能还会包含关于为什么选择 HornetQ 作为消息中间件的讨论,以及它在实际应用中的优势。 3. **项目信息**:这部分内容会提供关于 HornetQ 2.1 版本的具体信息,如软件的下载地址,可能包括不同平台的二进制包...

    hornetq-2.3.0.Final-bin.zip

    HornetQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)以及事务消息。 **6. 集群与高可用性** HornetQ支持集群部署,可以通过配置实现节点间的复制和负载均衡,提高服务的可用性和容错性。 **7. 性能...

    hermes 监听hornetq JMS配置步奏

    在IT行业中,消息传递系统是分布式应用程序之间进行通信的关键组件,而HornetQ和Hermes都是此类系统的重要组成部分。HornetQ是一个高性能、轻量级且完全开源的消息中间件,它提供了JMS(Java消息服务)接口,允许...

    HornetQ2.3 API 文档

    HornetQ 2.3.0 Alpha 发布,这不是一个简单的 Alpha 版本,同时也是一个大的发行版本。该版本对 2.2.0 进行了重构,引入一些原子故障迁移特性和大量企业特性改进。详细的新特性介绍请看发行说明。 HornetQ是一个...

    HornetQ 2.1 中文文档

    HornetQ是JBoss社区所研发的开放源代码消息中间件;HornetQ是以Java 5 编写,因此只需要操作系统支持Java虚拟机,HornetQ便可运行。 支持Java消息服务 (JMS) 1.1 版本 集群 (Clustering) 支持庞大的消息(Message)...

    hornetq 2.4.0免安装

    1. **消息队列原理**:HornetQ基于发布/订阅和点对点模型,提供了一种解耦应用程序的方式,允许发送方和接收方异步通信。在发布/订阅模式下,多个消费者可以订阅同一个主题;而在点对点模式下,消息只被一个消费者...

    .net 连接HornetQ,需要的dll

    .NET 连接HornetQ是一项关键的技术任务,HornetQ是一款开源的消息中间件,它提供了高效、可扩展和高可用性的消息传递服务。...HornetQ的灵活性和强大的功能使得它成为许多.NET应用程序理想的异步通信解决方案。

    ActiveMQ和HornetQ性能对比

    - 解压HornetQ与ActiveMQ并直接运行,未对系统或应用进行任何额外优化配置。 - 测试涵盖了不同的消息大小(1字节、10字节、100字节、1024字节、10240字节)以及消息数量(10条、100条、1000条、10000条),全面评估...

    HornetQ2.1中文手册.7z

    HornetQ是JBoss公司开发的一个开源消息中间件,它在Java消息服务(JMS)规范的基础上提供了高效、可扩展且高度可靠的异步通信功能。这个“HornetQ 2.1中文手册”是一个压缩包文件,包含了对HornetQ 2.1版本的详细...

    HornetQ2.0.0GA

    6. 容易集成:HornetQ 可以轻松集成到基于Java的任何应用程序中,包括但不限于Spring框架,以及JBoss应用服务器,因为HornetQ 原生是JBoss AS的一部分。 7. 集成开发环境:HornetQ 提供了强大的管理控制台和API,...

    hornetq-2.4.0.Final-bin.zip

    HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的...

    HornetQ Messaging Developer's Guide.pdf

    HornetQ是java开源实现的消息系统框架,性能上比ActiveQ要好一些,被集成到JBoss的消息服务中。 Table of Contents Preface 1 Chapter 1: Getting Started with HornetQ 9 Chapter 2: Setting Up HornetQ 31 ...

    hornetq-journal-2.3.19.Final.zip

    HornetQ是JBoss社区开发的一个高性能、可扩展且功能丰富的开源消息传递系统,它被广泛用于企业级应用中的异步通信和事件驱动架构。 【描述】"Sumac.zip,scala中的sumac ext zkargument解析" 提到的Sumac是一个基于...

    hornetq-2.2.5.Final.zip

    hornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.zip

    HornetQ集群配置

    选择哪种策略取决于应用的需求和性能考虑。 ### 5. 集群的稳定性与性能 为了保证集群的稳定性和性能,我们需要注意以下几点: - **网络稳定性**:由于集群依赖于网络通信,网络环境的稳定至关重要。 - **负载均衡*...

    hornetq-2.4.0.Final-bin.tar

    hornetq安装包, hornetq-2.4.0.Final-bin.tar 消息中间件 供项目中数据交互使用

    HornetQ_User_Manual.pdf

    HornetQ可被集成到Java EE应用服务器,也可以独立运行作为核心服务器。该软件在设计时就考虑了高可用性和集群化支持,为消息传递提供了强大灵活的解决方案。 文档中提到的几个关键概念,包括消息队列模式(The ...

Global site tag (gtag.js) - Google Analytics