`
hbxflihua
  • 浏览: 691358 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

简单封装spring-rabbit实现mq组件化

阅读更多

网上有关spring和rabbitmq整合的博文比比皆是,但是都没有形成整体解决方案,接下来我会通过对spring-rabbit的简单封装实现消息队列服务的组件化。

 

0、添加所需依赖jar

		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>1.6.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.9</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>4.2.6.RELEASE</version>
		</dependency>

 

1、创建rabbit生产者工具类

package com.huatech.mq.producer;  
  
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

/** 
 * 用于提供Rabbit的操作类 
 * @since 2017-7-31
 * @author lh 
 * 
 */  
public class RabbitProducer {  
	
	/**
	 * 默认交换机名称
	 */
	public static String DEFAULT_EXCHANGE_NAME = "rd-mq-exchange";

    private RabbitTemplate rabbitTemplate;  
  
    /**
     * 
     * @param rabbitTemplate
     */
    public RabbitProducer(){  
    }  
    
    /**
     * 
     * @param routingKey
     * @param message
     */
    public <T> void send(String routingKey,  T message) {  
    	send(DEFAULT_EXCHANGE_NAME, routingKey, message);
    }
  
    /**
     * 
     * @param exchange
     * @param routingKey
     * @param message
     */
    public <T> void send(String exchange,String routingKey,  T message) {  
        //实现将message通过json转换&将对象发送  
        rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {  
            //实现message操作处理实现  
            @Override  
            public Message postProcessMessage(Message message) throws AmqpException {  
                //设置信息的属性信息&设置发送模式(PERSISTENT:连续的)  
            	MessageProperties mp = message.getMessageProperties();
                mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);  
                mp.setContentType(MessageProperties.CONTENT_TYPE_JSON);
                return message;  
            }  
        }, new CorrelationData(String.valueOf(message)));  
    }

	public RabbitTemplate getRabbitTemplate() {
		return rabbitTemplate;
	}

	public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
		this.rabbitTemplate = rabbitTemplate;
	}
    
	
}  

 

2、在spring中配置生产者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"  
	xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
           http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">

	<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->  
    <bean id="jsonMessageConverter"  class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>  
    
	<!-- 创建connectionFactory --> 	
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbit.addresses}" 
		username="${rabbit.username}"
		password="${rabbit.password}" 
		executor="mqExecutor"/>
		
	<!-- org.springframework.amqp.rabbit.core.RabbitAdmin -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- queue 队列声明-->  
	<!-- org.springframework.amqp.core.Queue -->
	<rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" />
	
	<!-- exchange queue binging key 绑定 --> 
	<rabbit:direct-exchange id="rd-mq-exchange" name="rd-mq-exchange" durable="true" auto-delete="false" >
		<rabbit:bindings>
			<rabbit:binding queue="log_queue" key="log_queue_key" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	
	<!-- 生产者配置:spring template声明 -->
	<!-- org.springframework.amqp.rabbit.core.RabbitTemplate 
		channel-transacted	true,随着spring事务进行提交或者回滚
	-->
	<rabbit:template id="rabbitTemplate" exchange="rd-mq-exchange"
		connection-factory="connectionFactory" 
		message-converter="jsonMessageConverter" 
		retry-template="retryTemplate" />
	
	<!-- 重试机制 -->
	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
	    <property name="backOffPolicy">
	        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
	            <property name="initialInterval" value="1000" />
	            <property name="multiplier" value="3" />
	            <property name="maxInterval" value="3000" />
	        </bean>
	    </property>
	    <property name="retryPolicy">  
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">  
                <property name="maxAttempts" value="1" />  
            </bean>  
        </property> 
	</bean>
	
	<!-- 生产者工具类 -->		
	<bean id="rabbitProducer" class="com.huatech.mq.producer.RabbitProducer">
		<property name="rabbitTemplate" ref="rabbitTemplate"/>
	</bean>
		
</beans>

 

3、创建消息基类

package com.huatech.mq.model;

/**
 * MQ基类
 * @version 3.1
 * @author lh
 * @date 2017年7月31日
 */
public class MqBaseModel {
	
	/**
	 * 操作
	 */
	protected String operate;
	
	/**
	 * 无参构造方法
	 */
	public MqBaseModel() {
		super();
	}

	
	public String getOperate() {
		return operate;
	}

	public void setOperate(String operate) {
		this.operate = operate;
	}

}

 

4、添加具体model(MqLogModel)

package com.huatech.mq.model;
/**
 * 日志model
 * @author lh
 *
 */
public class MqLogModel extends MqBaseModel {
	
	private String info;
	private String logTime;
	
	public String getInfo() {
		return info;
	}

	public void setInfo(String info) {
		this.info = info;
	}

	public String getLogTime() {
		return logTime;
	}

	public void setLogTime(String logTime) {
		this.logTime = logTime;
	}



	@Override
	public String toString() {
		return "MqLogModel [logTime=" + logTime + "]";
	}
	

}

 

5、添加消息队列监听接口

package com.huatech.mq.listener;

/**
 * 消息队列监听接口
 * @author lh
 * @version 3.1
 * @since 2017-7-31
 * @param <T>
 */
public interface MqListener<T> {
	
	void listen(T t);

}

 

6、日志队列实现该接口

package com.huatech.mq.listener;

import org.springframework.stereotype.Component;

import com.huatech.mq.model.MqLogModel;

/**
 * 日志队列监听
 * @author lh
 * @version 3.0
 * @since 2017-8-2
 *
 */
@Component
public class LogQueueListener implements MqListener<MqLogModel> {
	
	//private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);

	@Override
	public void listen(MqLogModel t) {
		System.out.println(String.format("logInfo: %s, logTime:%s", t.getInfo(), t.getLogTime()));
	}

}

 

7、在spring中配置消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"  
	xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
           http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">


	<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->  
    <bean id="jsonMessageConverter"  class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>  
    
	<!-- 创建connectionFactory --> 	
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbit.addresses}" 
		username="${rabbit.username}"
		password="${rabbit.password}" 
		executor="mqExecutor"/>
		
	<!-- org.springframework.amqp.rabbit.core.RabbitAdmin -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- queue 队列声明-->  
	<!-- org.springframework.amqp.core.Queue -->
	<rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" />
	
	<!-- 消费者监听容器 -->
	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false"   
	      concurrency="10" message-converter="jsonMessageConverter" > 
    	    <rabbit:listener ref="logQueueListener" method="listen"  queues="log_queue" />
	</rabbit:listener-container>
		
</beans>

 

8、spring扫包

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation=" 
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context.xsd">
        
 	<description>Spring Configuration</description>
	
   	<!-- 加载配置属性文件 -->
	<context:property-placeholder ignore-unresolvable="true" location="classpath:rabbmitmq.properties" />
    
    <!-- 扫描相关的bean -->
    <context:component-scan base-package="com.huatech.mq"/>
    
</beans>

 

9、rabbmitmq.properties

#============== rabbitmq config ====================
rabbit.addresses=127.0.0.1:5672
rabbit.username=guest
rabbit.password=guest

 

 

10、测试用例

package com.huatech.mq;


import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.huatech.mq.model.MqLogModel;
import com.huatech.mq.producer.RabbitProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml","classpath:spring-rabbit-producer.xml","classpath:spring-rabbit-consumer.xml" })
public class LogQueueTest {

	@Autowired
	private RabbitProducer rabbitProducer;
	@Test
	public void test(){
		for (int i = 0; i < 10; i++) {
			MqLogModel model = new MqLogModel();
			model.setInfo("hello rabbitmq "+i);
			model.setLogTime(new Date().toString());		
			rabbitProducer.send("log_queue_key", model);
		}	
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
		}
	}
	
	
}

 

 

参考:Spring AMQP

 

分享到:
评论

相关推荐

    zxl:本人的开源框架,其中包含了一系列的开源组件,xxoo已经正式更名为xmlbean-converter成为zxl框架的一部分

    mq-rabbit 对于rabbitmq客户端的简单封装 mvc-common mvc框架的基础包 mvc-spring springmvc的简单封装 mvc-struts struts的简单封装 mvc-hbase 适于hbase的springmvc封装 orm-common orm框架的基础包 orm-hbase ...

    scratch少儿编程逻辑思维游戏源码-Pyorovania.zip

    scratch少儿编程逻辑思维游戏源码-Pyorovania.zip

    scratch少儿编程逻辑思维游戏源码-弹力猫.zip

    scratch少儿编程逻辑思维游戏源码-弹力猫.zip

    scratch少儿编程逻辑思维游戏源码-地心引力.zip

    scratch少儿编程逻辑思维游戏源码-地心引力.zip

    基于金枪鱼群优化算法的SVM在多变量时间序列预测中的MATLAB实现及优化

    内容概要:本文介绍了一种基于金枪鱼群优化算法(TSO)和支持向量机(SVM)的混合算法模型——TSO-SVM,在多变量时间序列预测中的应用。文中详细解释了TSO-SVM的工作原理,即通过模拟金枪鱼群觅食行为来优化SVM的参数,从而提升预测性能。同时提供了具体的Matlab代码实现步骤,包括参数初始化、模型训练和预测。实验结果显示,TSO-SVM相较于传统SVM方法,显著提升了预测的准确性和稳定性,并展示了良好的泛化能力。 适合人群:对机器学习尤其是时间序列预测感兴趣的科研人员和技术开发者。 使用场景及目标:①需要进行多变量时间序列预测的研究项目;②希望利用自然启发式优化算法改善现有SVM模型效果的技术团队。 其他说明:推荐使用Libsvm工具箱配合Matlab 2018B及以上版本,仅限于Windows 64位操作系统。

    机器视觉技术:OpenCV与Qt在工业相机采集及图像处理中的应用

    内容概要:本文深入探讨了机器视觉技术,重点介绍了OpenCV和Qt在工业相机采集及图像处理中的应用。文中详细讲述了卡尺工具、找线、找圆、颜色检测、模板匹配及形状匹配等关键技术的具体实现方法,并特别强调了海康工业相机采集与基于形状的模板匹配界面的开发。此外,形状匹配算法已被封装成DLL,方便直接调用。通过实际案例和技术解析,帮助读者全面掌握机器视觉系统的构建。 适合人群:对机器视觉感兴趣的初学者、有一定编程基础的研发人员、从事工业自动化领域的工程师。 使用场景及目标:适用于希望深入了解机器视觉技术及其应用场景的专业人士,旨在提升图像处理能力,优化工业自动化流程,提高生产效率。 其他说明:文章不仅提供理论知识,还附有示例代码,便于读者动手实践并加深理解。

    scratch少儿编程逻辑思维游戏源码-Billy奇妙之旅.zip

    scratch少儿编程逻辑思维游戏源码-Billy奇妙之旅.zip

    电动汽车BMS电池管理系统应用层软件模型:MBD方法、通信协议及AUTOSAR构建

    内容概要:本文详细介绍了基于模型开发(MBD)的BMS电池管理系统应用层软件模型。首先概述了BMS的核心任务,即确保电池的安全与高效运行,涉及充电、放电控制、实时监测和电池均衡管理。接着重点讨论了SUMlink电池管理系统策略模型,该模型通过收集和处理电池的数据(如电压、电流、温度),并运用多种算法(如SOC估算、SOH评估)来优化电池性能。文中还阐述了BMC CVS内部通讯协议DBC的作用,确保各模块间数据传输的准确性与效率。最后,介绍了AUTOSAR标准在BMS系统中的应用,特别是针对MPC5644A芯片的底层Build工程,提高了系统的可维护性、可扩展性和可靠性。此外,提到了INCA A2L标定文件的应用,用于配置和调整系统参数,以满足不同需求。 适合人群:从事电动汽车电池管理系统研究与开发的技术人员,尤其是对MBD方法、通信协议和AUTOSAR标准感兴趣的工程师。 使用场景及目标:适用于希望深入了解BMS系统的设计原理和技术细节的专业人士,旨在提高他们对该领域的理解和实际操作能力。 其他说明:通过对代码的具体分析,读者能够更加直观地理解BMS的工作流程及其各个组件间的协作关系。

    少儿编程scratch项目源代码文件案例素材-深海困境.zip

    少儿编程scratch项目源代码文件案例素材-深海困境.zip

    少儿编程scratch项目源代码文件案例素材-去吧泡泡糖.zip

    少儿编程scratch项目源代码文件案例素材-去吧泡泡糖.zip

    KEPServerEX6-6.17.269.0

    KEPServerEX6-6.17.269.0,最新版

    scratch少儿编程逻辑思维游戏源码-第二个循环.zip

    scratch少儿编程逻辑思维游戏源码-第二个循环.zip

    少儿编程scratch项目源代码文件案例素材-手里剑.zip

    少儿编程scratch项目源代码文件案例素材-手里剑.zip

    少儿编程scratch项目源代码文件案例素材-山地跳跃.zip

    少儿编程scratch项目源代码文件案例素材-山地跳跃.zip

    机器人路径规划中Informed RRT*算法详解与代码实现

    内容概要:本文详细介绍了Informed RRT*算法及其在机器人路径规划领域的应用。文章首先解释了该算法相较于传统RRT*算法的优势,即通过将采样范围限制在由起点和终点构成的椭圆区域内来提高搜索效率。接着,文中提供了具体的代码实现,包括椭圆采样的核心公式、路径优化的rewire步骤以及动态调整邻居半径的方法。此外,还讨论了路径队列管理和椭圆区域随路径优化动态更新的重要性。通过这些技术手段,Informed RRT*能够在找到初始路径后显著加快优化速度。 适合人群:对机器人路径规划感兴趣的科研人员、工程师及学生。 使用场景及目标:适用于需要高效路径规划的实际应用场景,如自动驾驶汽车、无人机飞行路径规划等。目标是在复杂环境中快速找到从起点到终点的最佳路径。 其他说明:建议读者在理解理论的基础上,结合提供的代码进行实验,以便更好地掌握算法的工作机制。同时,在不同环境条件下测试算法性能,观察其自适应能力。

    基于COMSOL有限元仿真的变压器辐射传热数值分析:从入门到进阶

    内容概要:本文详细介绍了基于COMSOL有限元软件的变压器辐射传热数值分析方法。首先,解释了变压器内外辐射传热的基本机理,包括热量通过传导、对流和辐射的方式传递,重点在于辐射传热的作用及其数学描述。接着,逐步引导读者从零开始构建有限元仿真模型,涵盖模型参数确定、网格划分、材料属性定义、边界条件设置、传热方程设定、仿真运行及结果分析等多个步骤。最后,探讨了进一步研究的方向,如不同因素(温度、材料属性、几何形状)对辐射传热的影响,以及该模型在电力电子设备和热管理系统的潜在应用。 适合人群:电气工程专业学生、初学者和技术爱好者,尤其是对有限元仿真和变压器辐射传热感兴趣的群体。 使用场景及目标:适用于希望通过实际操作掌握有限元仿真技能的人群,旨在帮助他们理解变压器辐射传热机制并能独立完成相关仿真项目。 其他说明:本文不仅提供了理论知识,还附带了详细的视频教程和仿真模型,使学习过程更加直观易懂。

    scratch少儿编程逻辑思维游戏源码-Scratch 奔跑.zip

    scratch少儿编程逻辑思维游戏源码-Scratch 奔跑.zip

    scratch少儿编程逻辑思维游戏源码-超级马里奥兄 回放引擎.zip

    scratch少儿编程逻辑思维游戏源码-超级马里奥兄 回放引擎.zip

    少儿编程scratch项目源代码文件案例素材-你准备好了吗?.zip

    少儿编程scratch项目源代码文件案例素材-你准备好了吗?.zip

Global site tag (gtag.js) - Google Analytics