`
hbxflihua
  • 浏览: 691380 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

简单封装spring-rabbit实现mq组件化

阅读更多

网上有关spring和rabbitmq整合的博文比比皆是,但是都没有形成整体解决方案,接下来我会通过对spring-rabbit的简单封装实现消息队列服务的组件化。

 

0、添加所需依赖jar

		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>1.6.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.9</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>4.2.6.RELEASE</version>
		</dependency>

 

1、创建rabbit生产者工具类

package com.huatech.mq.producer;  
  
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

/** 
 * 用于提供Rabbit的操作类 
 * @since 2017-7-31
 * @author lh 
 * 
 */  
public class RabbitProducer {  
	
	/**
	 * 默认交换机名称
	 */
	public static String DEFAULT_EXCHANGE_NAME = "rd-mq-exchange";

    private RabbitTemplate rabbitTemplate;  
  
    /**
     * 
     * @param rabbitTemplate
     */
    public RabbitProducer(){  
    }  
    
    /**
     * 
     * @param routingKey
     * @param message
     */
    public <T> void send(String routingKey,  T message) {  
    	send(DEFAULT_EXCHANGE_NAME, routingKey, message);
    }
  
    /**
     * 
     * @param exchange
     * @param routingKey
     * @param message
     */
    public <T> void send(String exchange,String routingKey,  T message) {  
        //实现将message通过json转换&将对象发送  
        rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {  
            //实现message操作处理实现  
            @Override  
            public Message postProcessMessage(Message message) throws AmqpException {  
                //设置信息的属性信息&设置发送模式(PERSISTENT:连续的)  
            	MessageProperties mp = message.getMessageProperties();
                mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);  
                mp.setContentType(MessageProperties.CONTENT_TYPE_JSON);
                return message;  
            }  
        }, new CorrelationData(String.valueOf(message)));  
    }

	public RabbitTemplate getRabbitTemplate() {
		return rabbitTemplate;
	}

	public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
		this.rabbitTemplate = rabbitTemplate;
	}
    
	
}  

 

2、在spring中配置生产者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"  
	xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
           http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">

	<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->  
    <bean id="jsonMessageConverter"  class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>  
    
	<!-- 创建connectionFactory --> 	
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbit.addresses}" 
		username="${rabbit.username}"
		password="${rabbit.password}" 
		executor="mqExecutor"/>
		
	<!-- org.springframework.amqp.rabbit.core.RabbitAdmin -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- queue 队列声明-->  
	<!-- org.springframework.amqp.core.Queue -->
	<rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" />
	
	<!-- exchange queue binging key 绑定 --> 
	<rabbit:direct-exchange id="rd-mq-exchange" name="rd-mq-exchange" durable="true" auto-delete="false" >
		<rabbit:bindings>
			<rabbit:binding queue="log_queue" key="log_queue_key" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	
	<!-- 生产者配置:spring template声明 -->
	<!-- org.springframework.amqp.rabbit.core.RabbitTemplate 
		channel-transacted	true,随着spring事务进行提交或者回滚
	-->
	<rabbit:template id="rabbitTemplate" exchange="rd-mq-exchange"
		connection-factory="connectionFactory" 
		message-converter="jsonMessageConverter" 
		retry-template="retryTemplate" />
	
	<!-- 重试机制 -->
	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
	    <property name="backOffPolicy">
	        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
	            <property name="initialInterval" value="1000" />
	            <property name="multiplier" value="3" />
	            <property name="maxInterval" value="3000" />
	        </bean>
	    </property>
	    <property name="retryPolicy">  
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">  
                <property name="maxAttempts" value="1" />  
            </bean>  
        </property> 
	</bean>
	
	<!-- 生产者工具类 -->		
	<bean id="rabbitProducer" class="com.huatech.mq.producer.RabbitProducer">
		<property name="rabbitTemplate" ref="rabbitTemplate"/>
	</bean>
		
</beans>

 

3、创建消息基类

package com.huatech.mq.model;

/**
 * MQ基类
 * @version 3.1
 * @author lh
 * @date 2017年7月31日
 */
public class MqBaseModel {
	
	/**
	 * 操作
	 */
	protected String operate;
	
	/**
	 * 无参构造方法
	 */
	public MqBaseModel() {
		super();
	}

	
	public String getOperate() {
		return operate;
	}

	public void setOperate(String operate) {
		this.operate = operate;
	}

}

 

4、添加具体model(MqLogModel)

package com.huatech.mq.model;
/**
 * 日志model
 * @author lh
 *
 */
public class MqLogModel extends MqBaseModel {
	
	private String info;
	private String logTime;
	
	public String getInfo() {
		return info;
	}

	public void setInfo(String info) {
		this.info = info;
	}

	public String getLogTime() {
		return logTime;
	}

	public void setLogTime(String logTime) {
		this.logTime = logTime;
	}



	@Override
	public String toString() {
		return "MqLogModel [logTime=" + logTime + "]";
	}
	

}

 

5、添加消息队列监听接口

package com.huatech.mq.listener;

/**
 * 消息队列监听接口
 * @author lh
 * @version 3.1
 * @since 2017-7-31
 * @param <T>
 */
public interface MqListener<T> {
	
	void listen(T t);

}

 

6、日志队列实现该接口

package com.huatech.mq.listener;

import org.springframework.stereotype.Component;

import com.huatech.mq.model.MqLogModel;

/**
 * 日志队列监听
 * @author lh
 * @version 3.0
 * @since 2017-8-2
 *
 */
@Component
public class LogQueueListener implements MqListener<MqLogModel> {
	
	//private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);

	@Override
	public void listen(MqLogModel t) {
		System.out.println(String.format("logInfo: %s, logTime:%s", t.getInfo(), t.getLogTime()));
	}

}

 

7、在spring中配置消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"  
	xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
           http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">


	<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->  
    <bean id="jsonMessageConverter"  class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>  
    
	<!-- 创建connectionFactory --> 	
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbit.addresses}" 
		username="${rabbit.username}"
		password="${rabbit.password}" 
		executor="mqExecutor"/>
		
	<!-- org.springframework.amqp.rabbit.core.RabbitAdmin -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- queue 队列声明-->  
	<!-- org.springframework.amqp.core.Queue -->
	<rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" />
	
	<!-- 消费者监听容器 -->
	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false"   
	      concurrency="10" message-converter="jsonMessageConverter" > 
    	    <rabbit:listener ref="logQueueListener" method="listen"  queues="log_queue" />
	</rabbit:listener-container>
		
</beans>

 

8、spring扫包

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation=" 
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context.xsd">
        
 	<description>Spring Configuration</description>
	
   	<!-- 加载配置属性文件 -->
	<context:property-placeholder ignore-unresolvable="true" location="classpath:rabbmitmq.properties" />
    
    <!-- 扫描相关的bean -->
    <context:component-scan base-package="com.huatech.mq"/>
    
</beans>

 

9、rabbmitmq.properties

#============== rabbitmq config ====================
rabbit.addresses=127.0.0.1:5672
rabbit.username=guest
rabbit.password=guest

 

 

10、测试用例

package com.huatech.mq;


import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.huatech.mq.model.MqLogModel;
import com.huatech.mq.producer.RabbitProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml","classpath:spring-rabbit-producer.xml","classpath:spring-rabbit-consumer.xml" })
public class LogQueueTest {

	@Autowired
	private RabbitProducer rabbitProducer;
	@Test
	public void test(){
		for (int i = 0; i < 10; i++) {
			MqLogModel model = new MqLogModel();
			model.setInfo("hello rabbitmq "+i);
			model.setLogTime(new Date().toString());		
			rabbitProducer.send("log_queue_key", model);
		}	
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
		}
	}
	
	
}

 

 

参考:Spring AMQP

 

分享到:
评论

相关推荐

    zxl:本人的开源框架,其中包含了一系列的开源组件,xxoo已经正式更名为xmlbean-converter成为zxl框架的一部分

    mq-rabbit 对于rabbitmq客户端的简单封装 mvc-common mvc框架的基础包 mvc-spring springmvc的简单封装 mvc-struts struts的简单封装 mvc-hbase 适于hbase的springmvc封装 orm-common orm框架的基础包 orm-hbase ...

    基于MATLAB GUI的学生成绩管理系统:功能实现与应用

    内容概要:本文介绍了一款基于MATLAB GUI的学生成绩管理系统,旨在提升学校教学管理的效率和准确性。系统主要由三个模块组成:考试收录数据模块、考试数据分析模块和统计分析数据模块。它不仅支持成绩的录入、显示、排序、查找,还包括特征值分析、直方图绘制和教师评语录入等功能。通过对成绩数据的综合分析,系统能为学校教学管理提供客观科学的数据支持。 适合人群:教育工作者(如教师、管理人员)和技术爱好者(特别是对MATLAB GUI感兴趣的开发者)。 使用场景及目标:适用于各类学校和教育机构,用于管理和分析学生成绩,帮助教师和管理者更好地了解学生的学习状况,改进教学质量。 阅读建议:对于希望深入了解如何利用MATLAB GUI进行学生成绩管理的读者来说,本文提供了详细的系统设计思路和功能实现方法,值得仔细研读并尝试实践。

    基于T-Mats库的涡扇发动机气路故障仿真模型:自定义故障植入与真实运行扰动分析

    内容概要:本文介绍了基于T-Mats库的涡扇发动机气路故障仿真模型,涵盖了多种类型的故障植入(如部件流量、效率及压比故障),并允许自定义故障程序和组合。该模型通过对软阈值去噪处理后的信号序列进行分析,提取真实的运行扰动信息,确保输出数据符合CMAPASS的排列要求。此外,该模型能够模拟航空发动机的典型气路故障,帮助研发和技术人员更准确地预测和评估发动机性能,从而提前采取预防措施。 适合人群:航空航天领域的研发人员、技术人员以及对航空发动机故障仿真感兴趣的学者。 使用场景及目标:①用于研究和开发涡扇发动机的气路故障诊断系统;②辅助工程师进行故障预测和性能评估;③为后续数据分析和实验验证提供可靠的数据基础。 其他说明:该模型不仅提高了仿真的准确性,还增强了对发动机运行状态的理解,为提升发动机性能和可靠性提供了强有力的技术支持。

    scratch少儿编程逻辑思维游戏源码-scratch冒险.zip

    scratch少儿编程逻辑思维游戏源码-scratch冒险.zip

    少儿编程scratch项目源代码文件案例素材-爬塔.zip

    少儿编程scratch项目源代码文件案例素材-爬塔.zip

    合金凝固模型中的相场模拟与各向异性枝晶生长研究及其在激光增材制造中的应用

    内容概要:本文详细探讨了合金凝固模型中的相场模拟方法及其在各向异性枝晶生长研究中的应用。首先介绍了合金凝固模型的基本概念及其在现代制造业中的重要性,特别是在激光增材制造、选择性激光熔融和定向凝固技术中的应用。接着,重点讨论了相场模拟作为一种数值模拟方法,在预测合金凝固过程中组织结构演变方面的关键作用。文中还提供了MATLAB实现合金各向异性枝晶生长的具体代码及详细注释,以及Comsol用于偏微分方程求解的雪花生长模型。最后,文章总结了当前的研究进展,并展望了未来的发展趋势。 适合人群:从事材料科学、冶金工程、激光增材制造领域的研究人员和技术人员,尤其是对相场模拟和合金凝固感兴趣的学者。 使用场景及目标:适用于希望深入了解合金凝固过程、相场模拟方法及其在现代制造技术中应用的专业人士。目标是提高对合金凝固机制的理解,优化制造工艺,提升产品质量。 其他说明:文章不仅提供了理论分析,还包括具体的代码实现和详细的文献参考资料,有助于读者全面掌握相关技术和最新研究进展。

    少儿编程scratch项目源代码文件案例素材-史莱姆出击.zip

    少儿编程scratch项目源代码文件案例素材-史莱姆出击.zip

    少儿编程scratch项目源代码文件案例素材-忍者酷跑.zip

    少儿编程scratch项目源代码文件案例素材-忍者酷跑.zip

    scratch少儿编程逻辑思维游戏源码-点击灌篮.zip

    scratch少儿编程逻辑思维游戏源码-点击灌篮.zip

    基于RBF神经网络的PID控制器在PMSM转速环中的Simulink模型设计与性能分析

    内容概要:本文介绍了将基于RBF神经网络的PID控制器应用于永磁同步电机(PMSM)转速环控制的方法及其性能优势。传统的PID控制器在面对非线性和时变系统时存在参数整定困难的问题,而引入RBF神经网络可以实现实时在线调参,提高系统的灵活性和鲁棒性。文中详细描述了Simulink模型的设计,特别是Matlab s-function模块中RBF神经网络的具体实现,包括高斯函数激活和带惯性的权值更新机制。实验结果显示,在转速突变情况下,改进后的控制器能够迅速稳定系统,超调量控制在2%以内,调节时间较传统方法缩短约40%,并且在负载变化时表现出色,无需重新整定参数。 适合人群:从事电机控制系统研究和开发的技术人员,尤其是对PID控制器优化感兴趣的工程师。 使用场景及目标:适用于需要提升PMSM转速环控制精度和响应速度的应用场合,如工业自动化设备、机器人等领域。目标是通过引入智能算法解决传统PID控制器参数整定难题,提高系统性能。 阅读建议:关注RBF神经网络与PID控制器结合的具体实现细节,特别是在Matlab s-function模块中的编码技巧以及参数调整策略。同时,注意学习率的选择和动量项的作用,这对于实际应用至关重要。

    scratch少儿编程逻辑思维游戏源码-Scratch 奔跑.zip

    scratch少儿编程逻辑思维游戏源码-Scratch 奔跑.zip

    基于COMSOL有限元仿真的变压器辐射传热数值分析:从入门到进阶

    内容概要:本文详细介绍了基于COMSOL有限元软件的变压器辐射传热数值分析方法。首先,解释了变压器内外辐射传热的基本机理,包括热量通过传导、对流和辐射的方式传递,重点在于辐射传热的作用及其数学描述。接着,逐步引导读者从零开始构建有限元仿真模型,涵盖模型参数确定、网格划分、材料属性定义、边界条件设置、传热方程设定、仿真运行及结果分析等多个步骤。最后,探讨了进一步研究的方向,如不同因素(温度、材料属性、几何形状)对辐射传热的影响,以及该模型在电力电子设备和热管理系统的潜在应用。 适合人群:电气工程专业学生、初学者和技术爱好者,尤其是对有限元仿真和变压器辐射传热感兴趣的群体。 使用场景及目标:适用于希望通过实际操作掌握有限元仿真技能的人群,旨在帮助他们理解变压器辐射传热机制并能独立完成相关仿真项目。 其他说明:本文不仅提供了理论知识,还附带了详细的视频教程和仿真模型,使学习过程更加直观易懂。

    交错并联Boost PFC仿真电路模型:基于双闭环控制的BCM模式优化与应用

    内容概要:本文详细介绍了交错并联Boost PFC(功率因数校正)仿真电路模型的设计与实现,特别是在临界BCM模式下的双闭环控制特性。文章首先解释了该电路的经典结构及其优势,即能够有效降低开关损耗和电流纹波。接着,重点讨论了双闭环控制的具体实现方法,包括外环电压控制和内环电流控制的MATLAB/Simulink代码示例。文中还特别强调了电流环中零交叉检测的重要性以及交错并联结构中驱动信号相位差的精确设置。此外,作者分享了将模型从Simulink转换到Plecs和Psim时遇到的问题及解决方案,如更换为带反向恢复特性的二极管模型和重新校准控制环路的采样周期。最后,文章展示了优化后的电流波形图,验证了所提方法的有效性。 适合人群:电力电子工程师、电源设计师、从事电力系统仿真的研究人员和技术爱好者。 使用场景及目标:适用于需要进行高效电源设计的研究和开发项目,旨在提高电源系统的性能,减少谐波失真,提升功率因数校正效果。 其他说明:文中提供的具体代码片段和参数设置有助于读者更好地理解和复现实验结果。同时,对于希望深入理解双闭环控制系统和BCM模式的人来说,本文提供了宝贵的实践经验。

    scratch少儿编程逻辑思维游戏源码-3000 横版闯过.zip

    scratch少儿编程逻辑思维游戏源码-3000 横版闯过.zip

    空气涡轮发动机Matlab/Simulink动态仿真模型:部件级建模与PID控制应用

    内容概要:本文介绍了如何利用Matlab/Simulink构建空气涡轮发动机的动态仿真模型。首先,文章详细阐述了各个部件级模型的设计,包括进气道、涡轮、气室、压气机、尾喷管、转子动力学模块和容积模块。接着,重点讨论了PID控制器在维持发动机转速恒定方面的作用,尤其是在面对输出扭矩阶跃扰动时的表现。最后,提供了简单的Simulink模型代码片段,展示了如何设置和运行仿真模型,以便实时监控和调整发动机性能。 适合人群:航空航天工程领域的研究人员和技术人员,尤其是那些对空气涡轮发动机仿真感兴趣的读者。 使用场景及目标:适用于希望通过Matlab/Simulink进行空气涡轮发动机仿真研究的专业人士。主要目标是掌握空气涡轮发动机各部件的工作原理及其相互关系,同时学会使用PID控制器优化发动机性能。 其他说明:本文不仅提供了理论知识,还附有实际操作步骤和代码示例,帮助读者更好地理解和应用所学内容。

    少儿编程scratch项目源代码文件案例素材-收集能量.zip

    少儿编程scratch项目源代码文件案例素材-收集能量.zip

    scratch少儿编程逻辑思维游戏源码-弹回的球.zip

    scratch少儿编程逻辑思维游戏源码-弹回的球.zip

    少儿编程scratch项目源代码文件案例素材-铅笔画.zip

    少儿编程scratch项目源代码文件案例素材-铅笔画.zip

    Sage Decrypter.zip

    Sage Decrypter.zip

Global site tag (gtag.js) - Google Analytics