`
hbxflihua
  • 浏览: 686862 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

简单封装spring-rabbit实现mq组件化

阅读更多

网上有关spring和rabbitmq整合的博文比比皆是,但是都没有形成整体解决方案,接下来我会通过对spring-rabbit的简单封装实现消息队列服务的组件化。

 

0、添加所需依赖jar

		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>1.6.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.9</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>4.2.6.RELEASE</version>
		</dependency>

 

1、创建rabbit生产者工具类

package com.huatech.mq.producer;  
  
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

/** 
 * 用于提供Rabbit的操作类 
 * @since 2017-7-31
 * @author lh 
 * 
 */  
public class RabbitProducer {  
	
	/**
	 * 默认交换机名称
	 */
	public static String DEFAULT_EXCHANGE_NAME = "rd-mq-exchange";

    private RabbitTemplate rabbitTemplate;  
  
    /**
     * 
     * @param rabbitTemplate
     */
    public RabbitProducer(){  
    }  
    
    /**
     * 
     * @param routingKey
     * @param message
     */
    public <T> void send(String routingKey,  T message) {  
    	send(DEFAULT_EXCHANGE_NAME, routingKey, message);
    }
  
    /**
     * 
     * @param exchange
     * @param routingKey
     * @param message
     */
    public <T> void send(String exchange,String routingKey,  T message) {  
        //实现将message通过json转换&将对象发送  
        rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {  
            //实现message操作处理实现  
            @Override  
            public Message postProcessMessage(Message message) throws AmqpException {  
                //设置信息的属性信息&设置发送模式(PERSISTENT:连续的)  
            	MessageProperties mp = message.getMessageProperties();
                mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);  
                mp.setContentType(MessageProperties.CONTENT_TYPE_JSON);
                return message;  
            }  
        }, new CorrelationData(String.valueOf(message)));  
    }

	public RabbitTemplate getRabbitTemplate() {
		return rabbitTemplate;
	}

	public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
		this.rabbitTemplate = rabbitTemplate;
	}
    
	
}  

 

2、在spring中配置生产者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"  
	xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
           http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">

	<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->  
    <bean id="jsonMessageConverter"  class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>  
    
	<!-- 创建connectionFactory --> 	
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbit.addresses}" 
		username="${rabbit.username}"
		password="${rabbit.password}" 
		executor="mqExecutor"/>
		
	<!-- org.springframework.amqp.rabbit.core.RabbitAdmin -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- queue 队列声明-->  
	<!-- org.springframework.amqp.core.Queue -->
	<rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" />
	
	<!-- exchange queue binging key 绑定 --> 
	<rabbit:direct-exchange id="rd-mq-exchange" name="rd-mq-exchange" durable="true" auto-delete="false" >
		<rabbit:bindings>
			<rabbit:binding queue="log_queue" key="log_queue_key" />
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	
	<!-- 生产者配置:spring template声明 -->
	<!-- org.springframework.amqp.rabbit.core.RabbitTemplate 
		channel-transacted	true,随着spring事务进行提交或者回滚
	-->
	<rabbit:template id="rabbitTemplate" exchange="rd-mq-exchange"
		connection-factory="connectionFactory" 
		message-converter="jsonMessageConverter" 
		retry-template="retryTemplate" />
	
	<!-- 重试机制 -->
	<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
	    <property name="backOffPolicy">
	        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
	            <property name="initialInterval" value="1000" />
	            <property name="multiplier" value="3" />
	            <property name="maxInterval" value="3000" />
	        </bean>
	    </property>
	    <property name="retryPolicy">  
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">  
                <property name="maxAttempts" value="1" />  
            </bean>  
        </property> 
	</bean>
	
	<!-- 生产者工具类 -->		
	<bean id="rabbitProducer" class="com.huatech.mq.producer.RabbitProducer">
		<property name="rabbitTemplate" ref="rabbitTemplate"/>
	</bean>
		
</beans>

 

3、创建消息基类

package com.huatech.mq.model;

/**
 * MQ基类
 * @version 3.1
 * @author lh
 * @date 2017年7月31日
 */
public class MqBaseModel {
	
	/**
	 * 操作
	 */
	protected String operate;
	
	/**
	 * 无参构造方法
	 */
	public MqBaseModel() {
		super();
	}

	
	public String getOperate() {
		return operate;
	}

	public void setOperate(String operate) {
		this.operate = operate;
	}

}

 

4、添加具体model(MqLogModel)

package com.huatech.mq.model;
/**
 * 日志model
 * @author lh
 *
 */
public class MqLogModel extends MqBaseModel {
	
	private String info;
	private String logTime;
	
	public String getInfo() {
		return info;
	}

	public void setInfo(String info) {
		this.info = info;
	}

	public String getLogTime() {
		return logTime;
	}

	public void setLogTime(String logTime) {
		this.logTime = logTime;
	}



	@Override
	public String toString() {
		return "MqLogModel [logTime=" + logTime + "]";
	}
	

}

 

5、添加消息队列监听接口

package com.huatech.mq.listener;

/**
 * 消息队列监听接口
 * @author lh
 * @version 3.1
 * @since 2017-7-31
 * @param <T>
 */
public interface MqListener<T> {
	
	void listen(T t);

}

 

6、日志队列实现该接口

package com.huatech.mq.listener;

import org.springframework.stereotype.Component;

import com.huatech.mq.model.MqLogModel;

/**
 * 日志队列监听
 * @author lh
 * @version 3.0
 * @since 2017-8-2
 *
 */
@Component
public class LogQueueListener implements MqListener<MqLogModel> {
	
	//private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);

	@Override
	public void listen(MqLogModel t) {
		System.out.println(String.format("logInfo: %s, logTime:%s", t.getInfo(), t.getLogTime()));
	}

}

 

7、在spring中配置消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"  
	xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
           http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">


	<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->  
    <bean id="jsonMessageConverter"  class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>  
    
	<!-- 创建connectionFactory --> 	
    <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory -->
    <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
	<rabbit:connection-factory id="connectionFactory"
		addresses="${rabbit.addresses}" 
		username="${rabbit.username}"
		password="${rabbit.password}" 
		executor="mqExecutor"/>
		
	<!-- org.springframework.amqp.rabbit.core.RabbitAdmin -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- queue 队列声明-->  
	<!-- org.springframework.amqp.core.Queue -->
	<rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" />
	
	<!-- 消费者监听容器 -->
	<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false"   
	      concurrency="10" message-converter="jsonMessageConverter" > 
    	    <rabbit:listener ref="logQueueListener" method="listen"  queues="log_queue" />
	</rabbit:listener-container>
		
</beans>

 

8、spring扫包

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation=" 
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context.xsd">
        
 	<description>Spring Configuration</description>
	
   	<!-- 加载配置属性文件 -->
	<context:property-placeholder ignore-unresolvable="true" location="classpath:rabbmitmq.properties" />
    
    <!-- 扫描相关的bean -->
    <context:component-scan base-package="com.huatech.mq"/>
    
</beans>

 

9、rabbmitmq.properties

#============== rabbitmq config ====================
rabbit.addresses=127.0.0.1:5672
rabbit.username=guest
rabbit.password=guest

 

 

10、测试用例

package com.huatech.mq;


import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.huatech.mq.model.MqLogModel;
import com.huatech.mq.producer.RabbitProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml","classpath:spring-rabbit-producer.xml","classpath:spring-rabbit-consumer.xml" })
public class LogQueueTest {

	@Autowired
	private RabbitProducer rabbitProducer;
	@Test
	public void test(){
		for (int i = 0; i < 10; i++) {
			MqLogModel model = new MqLogModel();
			model.setInfo("hello rabbitmq "+i);
			model.setLogTime(new Date().toString());		
			rabbitProducer.send("log_queue_key", model);
		}	
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
		}
	}
	
	
}

 

 

参考:Spring AMQP

 

分享到:
评论

相关推荐

    spring-cglib-repack-3.2.4.jar和spring-objenesis-repack-2.5.1.jar

    2. 易于使用:Spring框架已经封装了CGLIB的使用,开发者通常不需要直接与CGLIB库交互。 `spring-objenesis-repack-2.5.1.jar`则是Objenesis库的重新打包版本。Objenesis是一个轻量级的Java库,用于在没有执行构造...

    spring-aop-jar

    Spring AOP模块提供了实现AOP规范的功能,它允许开发者定义“切面”来封装系统中的横切关注点,如日志、事务管理等。该jar文件包含了Spring AOP的核心类和接口,如`org.springframework.aop.*`包下的`...

    spring-rabbit-simple-sdk:基于spring4.0x版本的rabbitmq极简封装

    spring-rabbit-simple-sdk 在spring4.0x的项目背景下,封装的一个好用简便的rabbitmq相关的sdk. 封装内容 消息base: BaseMqMessage消息基类,所有的业务消息都需要继承它,其中的eventType(根据业务需求是否要填)和...

    spring-aop-5.1.0.RELEASE.jar

    spring-**core**-4.3.6.RELEASE.jar :包含spring框架基本的核心工具类,spring其他组件都要用到这个包里的类,其他组件的基本核心 spring-**beans**-4.3.6.RELEASE.jar:所有应用都要用到的jar包,它包含访问配置...

    手写一个Spring-Boot-Starter组件.zip

    在Spring Boot生态系统中,`Starter`组件是其核心特性之一,它简化了项目的构建配置。本教程将指导你如何手写一个自定义的`Spring-Boot-Starter`组件,以便于集成特定功能,例如本例中的`redis-spring-boot-starter`...

    spring-mvc开发所有jar包【4.3.4】

    3. **spring-webmvc-4.3.4.RELEASE.jar**:Spring MVC的主要实现模块,负责处理HTTP请求和响应,提供了DispatcherServlet、ModelAndView、HandlerMapping、ViewResolver等关键组件,实现了MVC模式。 4. **spring-...

    spring-jdbc jar包.rar

    1. **JdbcTemplate**:这是Spring JDBC的核心类,它通过模板方法模式将常见的JDBC操作进行了封装,如执行SQL查询、更新、调用存储过程等。开发者只需要关注SQL语句和参数,而无需处理连接创建、关闭、异常处理等繁琐...

    阿里云_RocketMQ_SDK_封装为spring-boot-starter_spring-boot-starter-r

    阿里云_RocketMQ_SDK_封装为spring-boot-starter_spring-boot-starter-rocketmq

    mybatis-spring-1.3.0.jar 下载

    1. **自动扫描Mapper接口并注入SqlSession**:MyBatis-Spring能自动扫描项目中的Mapper接口,并为每个接口注入一个实现了该接口的代理对象,该代理对象内部封装了SqlSession,从而简化了对数据库操作的代码。...

    spring-boot-2.7.0.zip源码

    `spring-boot-2.7.0`源码中,`EnvironmentPostProcessor`接口用于在Spring Environment初始化后处理环境变量,从而实现环境感知。 4. **Web应用启动**:Spring Boot的`WebApplicationInitializer`和`SpringBoot...

    spring-jdbc-4.3.9.RELEASE

    spring-jdbc连接jar包,spring对于jdbc技术的封装,spring容器组成部分

    spring-web.jar spring-webmvc.jar

    Spring框架是Java开发中不可或缺的一部分,它为构建高效、可重用的Web应用程序提供了强大的支持。...通过Spring Boot,这些依赖项可以自动化管理,简化项目设置,让开发者更专注于业务逻辑的实现。

    spring-boot-starter-hbase自定义的spring-boot的hbasestarter

    这得益于Spring Boot的模块化设计,我们可以灵活地扩展和定制`spring-boot-starter-hbase`,以满足不同场景下的需求。 总的来说,`spring-boot-starter-hbase`为Java开发者提供了一个高效、易用的桥梁,连接了...

    spring-framework-4.3.5 所有JAR文件包

    Spring AOP支持使用代理模式实现切面,包括基于类的代理和基于JDK动态代理。 4. **MVC框架**: - Spring MVC是Spring Framework中的Web层组件,用于构建Web应用程序。它提供了一个模型-视图-控制器架构,使得开发...

    pay-spring-boot-starter-parent:基于spring-boot实现自动化配置的支付对接,让你真正做到一行代码实现支付聚合,让你可以不用理解支付怎么对接,只需要专注你的业务 全能第三方支付对接pay-spring-boot-starter开发工具包

    pay-spring-boot-starter 是一个基于spring-boot实现自动化配置的支付对接, 让你真正做到一行代码实现支付聚合, 让你可以不用理解支付怎么对接,只需要专注你的业务 特性 1. 项目第三方依赖极少,依托于spring ...

    spring-data-commons-1.7.2.RELEASEspring-data-jpa-1.5.2.RELEASE-java datajpa

    1. **Repository 组件**:定义了一种契约,允许开发者通过简单的接口定义存储库行为,而无需编写具体的实现代码。例如,`CrudRepository` 接口提供了基本的 CRUD(创建、读取、更新、删除)操作。 2. **Query ...

    spring-framework-4.2.0.RELEASE官方完整包加官方文档

    面向切面编程在Spring中是通过AOP模块实现的,它允许开发者定义“切面”来封装横切关注点,如日志、事务管理等。在4.2.0.RELEASE中,AOP的使用更加灵活,支持更多类型的切点表达式,使得切面的定义更加精确。 ...

    spring-web.jar

    在Java开发领域,Spring框架无疑是最重要的组件之一,而`spring-web.jar`则是Spring框架中的核心模块,它为构建Web应用程序提供了丰富的功能支持。本文将全面解析`spring-web.jar`中包含的关键知识点,帮助开发者更...

    maven版spring-data-redis简单示例

    基于spring的子项目spring-data-redis写的一个基于hash类型的用户CRUD,经过简单的封装,可以实现通用CRUD,请事先安装好redis,配置文件redis.properties请做相应修改,希望对你有帮助。

Global site tag (gtag.js) - Google Analytics