网上有关spring和rabbitmq整合的博文比比皆是,但是都没有形成整体解决方案,接下来我会通过对spring-rabbit的简单封装实现消息队列服务的组件化。
0、添加所需依赖jar
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.9</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.2.6.RELEASE</version> </dependency>
1、创建rabbit生产者工具类
package com.huatech.mq.producer; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; /** * 用于提供Rabbit的操作类 * @since 2017-7-31 * @author lh * */ public class RabbitProducer { /** * 默认交换机名称 */ public static String DEFAULT_EXCHANGE_NAME = "rd-mq-exchange"; private RabbitTemplate rabbitTemplate; /** * * @param rabbitTemplate */ public RabbitProducer(){ } /** * * @param routingKey * @param message */ public <T> void send(String routingKey, T message) { send(DEFAULT_EXCHANGE_NAME, routingKey, message); } /** * * @param exchange * @param routingKey * @param message */ public <T> void send(String exchange,String routingKey, T message) { //实现将message通过json转换&将对象发送 rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() { //实现message操作处理实现 @Override public Message postProcessMessage(Message message) throws AmqpException { //设置信息的属性信息&设置发送模式(PERSISTENT:连续的) MessageProperties mp = message.getMessageProperties(); mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT); mp.setContentType(MessageProperties.CONTENT_TYPE_JSON); return message; } }, new CorrelationData(String.valueOf(message))); } public RabbitTemplate getRabbitTemplate() { return rabbitTemplate; } public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } }
2、在spring中配置生产者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd"> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 创建connectionFactory --> <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory --> <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" /> <rabbit:connection-factory id="connectionFactory" addresses="${rabbit.addresses}" username="${rabbit.username}" password="${rabbit.password}" executor="mqExecutor"/> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明--> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" /> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange id="rd-mq-exchange" name="rd-mq-exchange" durable="true" auto-delete="false" > <rabbit:bindings> <rabbit:binding queue="log_queue" key="log_queue_key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 生产者配置:spring template声明 --> <!-- org.springframework.amqp.rabbit.core.RabbitTemplate channel-transacted true,随着spring事务进行提交或者回滚 --> <rabbit:template id="rabbitTemplate" exchange="rd-mq-exchange" connection-factory="connectionFactory" message-converter="jsonMessageConverter" retry-template="retryTemplate" /> <!-- 重试机制 --> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="3" /> <property name="maxInterval" value="3000" /> </bean> </property> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="1" /> </bean> </property> </bean> <!-- 生产者工具类 --> <bean id="rabbitProducer" class="com.huatech.mq.producer.RabbitProducer"> <property name="rabbitTemplate" ref="rabbitTemplate"/> </bean> </beans>
3、创建消息基类
package com.huatech.mq.model; /** * MQ基类 * @version 3.1 * @author lh * @date 2017年7月31日 */ public class MqBaseModel { /** * 操作 */ protected String operate; /** * 无参构造方法 */ public MqBaseModel() { super(); } public String getOperate() { return operate; } public void setOperate(String operate) { this.operate = operate; } }
4、添加具体model(MqLogModel)
package com.huatech.mq.model; /** * 日志model * @author lh * */ public class MqLogModel extends MqBaseModel { private String info; private String logTime; public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public String getLogTime() { return logTime; } public void setLogTime(String logTime) { this.logTime = logTime; } @Override public String toString() { return "MqLogModel [logTime=" + logTime + "]"; } }
5、添加消息队列监听接口
package com.huatech.mq.listener; /** * 消息队列监听接口 * @author lh * @version 3.1 * @since 2017-7-31 * @param <T> */ public interface MqListener<T> { void listen(T t); }
6、日志队列实现该接口
package com.huatech.mq.listener; import org.springframework.stereotype.Component; import com.huatech.mq.model.MqLogModel; /** * 日志队列监听 * @author lh * @version 3.0 * @since 2017-8-2 * */ @Component public class LogQueueListener implements MqListener<MqLogModel> { //private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class); @Override public void listen(MqLogModel t) { System.out.println(String.format("logInfo: %s, logTime:%s", t.getInfo(), t.getLogTime())); } }
7、在spring中配置消费者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd"> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 创建connectionFactory --> <!-- org.springframework.amqp.rabbit.connection.CachingConnectionFactory --> <task:executor id="mqExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" /> <rabbit:connection-factory id="connectionFactory" addresses="${rabbit.addresses}" username="${rabbit.username}" password="${rabbit.password}" executor="mqExecutor"/> <!-- org.springframework.amqp.rabbit.core.RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明--> <!-- org.springframework.amqp.core.Queue --> <rabbit:queue id="log_queue" name="log_queue" durable="true" auto-delete="false" exclusive="false" /> <!-- 消费者监听容器 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" requeue-rejected="false" concurrency="10" message-converter="jsonMessageConverter" > <rabbit:listener ref="logQueueListener" method="listen" queues="log_queue" /> </rabbit:listener-container> </beans>
8、spring扫包
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <description>Spring Configuration</description> <!-- 加载配置属性文件 --> <context:property-placeholder ignore-unresolvable="true" location="classpath:rabbmitmq.properties" /> <!-- 扫描相关的bean --> <context:component-scan base-package="com.huatech.mq"/> </beans>
9、rabbmitmq.properties
#============== rabbitmq config ==================== rabbit.addresses=127.0.0.1:5672 rabbit.username=guest rabbit.password=guest
10、测试用例
package com.huatech.mq; import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.huatech.mq.model.MqLogModel; import com.huatech.mq.producer.RabbitProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring-context.xml","classpath:spring-rabbit-producer.xml","classpath:spring-rabbit-consumer.xml" }) public class LogQueueTest { @Autowired private RabbitProducer rabbitProducer; @Test public void test(){ for (int i = 0; i < 10; i++) { MqLogModel model = new MqLogModel(); model.setInfo("hello rabbitmq "+i); model.setLogTime(new Date().toString()); rabbitProducer.send("log_queue_key", model); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } }
参考:Spring AMQP
相关推荐
2. 易于使用:Spring框架已经封装了CGLIB的使用,开发者通常不需要直接与CGLIB库交互。 `spring-objenesis-repack-2.5.1.jar`则是Objenesis库的重新打包版本。Objenesis是一个轻量级的Java库,用于在没有执行构造...
Spring AOP模块提供了实现AOP规范的功能,它允许开发者定义“切面”来封装系统中的横切关注点,如日志、事务管理等。该jar文件包含了Spring AOP的核心类和接口,如`org.springframework.aop.*`包下的`...
spring-rabbit-simple-sdk 在spring4.0x的项目背景下,封装的一个好用简便的rabbitmq相关的sdk. 封装内容 消息base: BaseMqMessage消息基类,所有的业务消息都需要继承它,其中的eventType(根据业务需求是否要填)和...
spring-**core**-4.3.6.RELEASE.jar :包含spring框架基本的核心工具类,spring其他组件都要用到这个包里的类,其他组件的基本核心 spring-**beans**-4.3.6.RELEASE.jar:所有应用都要用到的jar包,它包含访问配置...
1. **JdbcTemplate**:这是Spring JDBC的核心类,它通过模板方法模式将常见的JDBC操作进行了封装,如执行SQL查询、更新、调用存储过程等。开发者只需要关注SQL语句和参数,而无需处理连接创建、关闭、异常处理等繁琐...
1. **自动扫描Mapper接口并注入SqlSession**:MyBatis-Spring能自动扫描项目中的Mapper接口,并为每个接口注入一个实现了该接口的代理对象,该代理对象内部封装了SqlSession,从而简化了对数据库操作的代码。...
阿里云_RocketMQ_SDK_封装为spring-boot-starter_spring-boot-starter-rocketmq
spring-jdbc连接jar包,spring对于jdbc技术的封装,spring容器组成部分
Spring框架是Java开发中不可或缺的一部分,它为构建高效、可重用的Web应用程序提供了强大的支持。...通过Spring Boot,这些依赖项可以自动化管理,简化项目设置,让开发者更专注于业务逻辑的实现。
这得益于Spring Boot的模块化设计,我们可以灵活地扩展和定制`spring-boot-starter-hbase`,以满足不同场景下的需求。 总的来说,`spring-boot-starter-hbase`为Java开发者提供了一个高效、易用的桥梁,连接了...
Spring AOP支持使用代理模式实现切面,包括基于类的代理和基于JDK动态代理。 4. **MVC框架**: - Spring MVC是Spring Framework中的Web层组件,用于构建Web应用程序。它提供了一个模型-视图-控制器架构,使得开发...
pay-spring-boot-starter 是一个基于spring-boot实现自动化配置的支付对接, 让你真正做到一行代码实现支付聚合, 让你可以不用理解支付怎么对接,只需要专注你的业务 特性 1. 项目第三方依赖极少,依托于spring ...
其中,Spring Cloud Netflix Hystrix是一款至关重要的组件,它为企业级应用提供了强大的容错保护,实现了服务间的熔断和降级策略,极大地提升了系统的稳定性和可靠性。 Hystrix的核心概念包括: 1. **熔断器模式**...
1. **Repository 组件**:定义了一种契约,允许开发者通过简单的接口定义存储库行为,而无需编写具体的实现代码。例如,`CrudRepository` 接口提供了基本的 CRUD(创建、读取、更新、删除)操作。 2. **Query ...
自定义`spring-boot-starter`是为了满足特定项目需求,将一些通用的功能模块化,方便在多个项目中复用,降低代码耦合度,提升开发效率。本文将深入探讨如何进行自定义`spring-boot-starter`的封装,以及其背后的原理...
面向切面编程在Spring中是通过AOP模块实现的,它允许开发者定义“切面”来封装横切关注点,如日志、事务管理等。在4.2.0.RELEASE中,AOP的使用更加灵活,支持更多类型的切点表达式,使得切面的定义更加精确。 ...
基于spring的子项目spring-data-redis写的一个基于hash类型的用户CRUD,经过简单的封装,可以实现通用CRUD,请事先安装好redis,配置文件redis.properties请做相应修改,希望对你有帮助。
`spring-boot-2.7.0`源码中,`EnvironmentPostProcessor`接口用于在Spring Environment初始化后处理环境变量,从而实现环境感知。 4. **Web应用启动**:Spring Boot的`WebApplicationInitializer`和`SpringBoot...
首先,Spring for Android是Spring框架的一个扩展,专门为Android平台设计,旨在提供一套轻量级的组件和服务,便于开发者构建高效、可维护的Android应用。它的核心目标是将Spring的灵活性和强大功能引入到移动开发...
深入源码分析,我们可以看到Spring-data-redis如何巧妙地封装了Jedis客户端,提供了抽象的RedisOperations接口,使得操作Redis变得简单。例如,RedisTemplate中的opsForValue()方法返回一个ValueOperations实例,...