本文简单记录一下 spring 整合 rabbitmq,此处引入spring boot是为了方便引入和rabbitmq整合相关的jar包,并没有使用spring boot整合 rabbitmq。
实现功能
- 完成 spring 和 rabbitmq 的整合
- 完成使用 rabbitAdmin 创建队列等
- 完成使用 @Bean 注解声明队列等
- 完成使用 RabbitTemplate 进行发送消息
- 使用 SimpleMessageListenerContainer 进行消息的监听,可以对消息进行各种适配等
整合步骤:
1、引入 jar 包。
2、配置 ConnectionFacotry。
3、配置 RabbitAdmin 方便维护队列、交换器、绑定等。
4、配置 RabbitTemplate 方便程序中的消息的发送和接收等。
5、配置 SimpleMessageListenerContainer 方便程序中消息的监听。
小知识点:
1、在程序中进行队列、交换器、绑定的声明和使用 @Bean 注解来进行声明,如果要想在程序启动的时候自动创建这些队列等,那么在配置 RabbitAdmin 的时候需要将 autoStartup 属性设置成 true。
2、如果我们在程序运行的过程中需要动态修改 监听的队列或移除队列等,那么可以使用 SimpleMessageListenerContainer 来动态修改这些参数。
3、在如果我们想对接收到RabbitMQ发送的消息进行适配和消息转换等,那么使用SimpleMessageListenerContainer这个可以使用。
实现步骤:
1、引入 spring 整合 rabbitmq 整合的 maven 依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2、spring 整合 rabbitmq 的核心配置类
/** * rabbitmq 配置 * * @author huan.fu * @date 2018/10/17 - 10:57 */ @Configuration @Slf4j public class RabbitmqConfiguration { /** * 创建 rabbitmq 连接工厂 * * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); connectionFactory.setHost("140.143.237.224"); connectionFactory.setPort(5672); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("/"); return connectionFactory; } /** * rabbitmq 实现 AMQP 便携式的管理操作,比如创建队列、绑定、交换器等 * * @param connectionFactory * @return */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * rabbit mq 模板 * * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } /** * 消息监听容器 * * @param connectionFactory * @return */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); // 设置监听的队列 simpleMessageListenerContainer.setQueueNames("queue001", "queue002"); // 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加 simpleMessageListenerContainer.setConcurrentConsumers(3); // 最大的并发消费者 simpleMessageListenerContainer.setMaxConcurrentConsumers(10); // 设置是否重回队列 simpleMessageListenerContainer.setDefaultRequeueRejected(false); // 设置签收模式 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置非独占模式 simpleMessageListenerContainer.setExclusive(false); // 设置consumer未被 ack 的消息个数 simpleMessageListenerContainer.setPrefetchCount(1); // 接收到消息的后置处理 simpleMessageListenerContainer.setAfterReceivePostProcessors((MessagePostProcessor) message -> { message.getMessageProperties().getHeaders().put("接收到消息后", "在消息消费之前的一个后置处理"); return message; }); // 设置 consumer 的 tag simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() { private AtomicInteger consumer = new AtomicInteger(1); @Override public String createConsumerTag(String queue) { return String.format("consumer:%s:%d", queue, consumer.getAndIncrement()); } }); // 设置消息监听器 simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { try { log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error(e.getMessage(), e); // 发生异常此处需要捕获到 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }); /** ================ 消息转换器的用法 ================ simpleMessageListenerContainer.setMessageConverter(new MessageConverter() { // 将 java 对象转换成 Message 对象 @Override public Message toMessage(Object object, MessageProperties messageProperties) { return null; } // 将 message 对象转换成 java 对象 @Override public Object fromMessage(Message message) { return null; } }); */ /** ================ 消息适配器的用法,用于处理各种不同的消息 ================ MessageListenerAdapter adapter = new MessageListenerAdapter(); // 设置真正处理消息的对象,可以是一个普通的java对象,也可以是 ChannelAwareMessageListener 等 adapter.setDelegate(null); adapter.setDefaultListenerMethod("设置上一步中delegate对象中处理的方法名"); ContentTypeDelegatingMessageConverter converters = new ContentTypeDelegatingMessageConverter(); // 文本装换器 MessageConverter txtMessageConvert = null; // json 转换器 MessageConverter jsonMessageConvert = null; converters.addDelegate("text", txtMessageConvert); converters.addDelegate("html/text", txtMessageConvert); converters.addDelegate("text/plain", txtMessageConvert); converters.addDelegate("json", jsonMessageConvert); converters.addDelegate("json/*", jsonMessageConvert); converters.addDelegate("application/json", jsonMessageConvert); adapter.setMessageConverter(converters); simpleMessageListenerContainer.setMessageListener(adapter); */ return simpleMessageListenerContainer; } @Bean public Queue queue003() { return new Queue("queue003", false, false, false, null); } @Bean public Exchange exchange003() { return new TopicExchange("exchange003", false, false, null); } @Bean public Binding binding003() { return new Binding("queue003", Binding.DestinationType.QUEUE, "exchange003", "save.*", null); } }
3、动态移除和增加对队列的监听
/** * 测试动态改变 SimpleMessageListenerContainer 的属性,比如动态增加需要监听的队列等 * * @author huan.fu * @date 2018/10/17 - 15:12 */ @Component @Slf4j public class DynamicSimpleMessageListenerContainerTest implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { try { TimeUnit.SECONDS.sleep(10); SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class); log.info("移除对队列:[{}]的监听", "queue001"); simpleMessageListenerContainer.removeQueueNames("queue001"); TimeUnit.SECONDS.sleep(5); log.info("添加对队列:[{}]的监听", "queue001"); String[] queueNames = simpleMessageListenerContainer.getQueueNames(); Arrays.copyOf(queueNames, queueNames.length + 1); queueNames[queueNames.length - 1] = "queue001"; simpleMessageListenerContainer.addQueueNames(queueNames); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
4、使用 RabbitAdmin 创建队列等
/** * 测试 rabbitAdmin * * @author huan.fu * @date 2018/10/17 - 12:54 */ @Component @Slf4j public class RabbitAdminService implements InitializingBean { @Autowired private RabbitAdmin rabbitAdmin; /** * 创建队列 * * @param queueName */ public void createQueue(String queueName) { log.info("创建队列:[{}]", queueName); rabbitAdmin.declareQueue(new Queue(queueName, false, false, false, null)); } /** * 创建direct交换器 * * @param exchangeName */ public void createDirectExchange(String exchangeName) { log.info("创建direct交换器:[{}]", exchangeName); rabbitAdmin.declareExchange(new DirectExchange(exchangeName, false, false, null)); } /** * 创建topic交换器 * * @param exchangeName */ public void createTopicExchange(String exchangeName) { log.info("创建topic交换器:[{}]", exchangeName); rabbitAdmin.declareExchange(new TopicExchange(exchangeName, false, false, null)); } @Override public void afterPropertiesSet() throws Exception { createQueue("queue001"); createQueue("queue002"); createDirectExchange("exchange001"); createTopicExchange("exchange002"); // 创建绑定 rabbitAdmin.declareBinding(new Binding("queue001", Binding.DestinationType.QUEUE, "exchange001", "direct_001", null)); // 创建绑定 rabbitAdmin.declareBinding(new Binding("queue002", Binding.DestinationType.QUEUE, "exchange002", "topic.save.#", null)); } }
5、使用 RabbitTemplate 进行发送消息
/** * RabbitTemplate测试 * * @author huan.fu * @date 2018/10/17 - 14:35 */ @Component @Slf4j public class RabbitTemplateTest implements InitializingBean { @Autowired private RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() throws Exception { new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); IntStream.rangeClosed(1, 10).forEach(num -> rabbitTemplate.convertAndSend("exchange001", "direct_001", String.format("这个是第[%d]条消息.", num))); } catch (InterruptedException e) { log.error(e.getMessage(), e); } }).start(); } }
6、启动类
/** * spring 整合 rabbitmq * * @author huan.fu * @date 2018/10/17 - 10:53 */ @SpringBootApplication public class RabbitMqApplication { public static void main(String[] args) { SpringApplication.run(RabbitMqApplication.class, args); } }
7、运行结果
程序代码
代码: https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-spring
相关推荐
本文将详细介绍如何整合Spring与RabbitMQ,以实现高效的消息传递。 首先,我们要理解Spring对RabbitMQ的支持主要体现在Spring AMQP项目中,它为RabbitMQ提供了一套高级抽象层,使得开发者能够更加便捷地使用...
在"spring整合rabbitmq需要的jar包(spring版本4.2.0)"中,提到了几个核心的库文件,它们分别是: 1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和...
Spring集成RabbitMQ还支持异步处理,可以通过`SimpleMessageListenerContainer`配置启用。同时,Spring提供了事务支持,确保消息在处理过程中的一致性。 ### 5. 性能优化 为了提高性能,可以使用`...
首先,让我们理解Spring集成RabbitMQ的基本步骤: 1. **添加依赖**:在你的`pom.xml`或`build.gradle`文件中,你需要添加Spring AMQP和RabbitMQ客户端库的依赖。对于Maven用户,可以添加如下依赖: ```xml ...
本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...
本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ服务器,并且能够正常运行。RabbitMQ通常通过AMQP(Advanced Message Queuing Protocol)协议进行...
RabbitMQ入门到进阶(Spring整合RabbitMQ&SpringBoot整合RabbitMQ).doc
rabbitmq spring rabbitmq spring rabbitmq spring rabbitmq spring http://knight-black-bob.iteye.com/blog/2304089
此源码是在前人Demo的基础上自己改造的,主要讲了spring和rabbitMQ的整合与使用。其中包含两个项目,一个Producer,一个Consumer,能够分别运行。Producer生产并发送消息到RabbitMQ,Consumer从RabbitMQ接收并处理...
**SSM集成RabbitMQ最初始的项目信息详解** 在Java Web开发中,Spring、Struts和MyBatis(简称SSM)是一个常见的框架组合,它们分别负责控制层、表现层和数据持久化层。而RabbitMQ则是一个强大的消息中间件,常用于...
介绍Spring Cloud Stream与RabbitMQ集成的代码示例。Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。
通过上述步骤,你可以在Spring应用程序中成功集成RabbitMQ,并利用direct、topic和fanout交换器实现高效的消息路由。在实际项目中,根据业务需求选择合适的交换器模式,可以提高系统的可扩展性和可靠性。
本文将详细探讨如何在Spring项目中集成RabbitMQ,并实现两个系统间的通信。 首先,我们需要在项目中添加RabbitMQ的相关依赖。在Maven的pom.xml文件中,我们需要引入Spring AMQP和RabbitMQ的客户端库: ```xml ...
在这个基于Spring集成RabbitMQ的源码示例中,我们主要会关注三种不同的交换机类型:direct、topic和fanout。 1. **Direct Exchange(直连交换机)**: 直连交换机是最简单的模型,它将消息路由到绑定键与发布时...
1. 引入依赖:在项目中添加RabbitMQ的Spring整合依赖,如`spring-amqp`库。 2. 配置RabbitMQ:在Spring的配置文件中,定义连接工厂、信道配置以及RabbitMQ服务器的相关属性。 3. 创建消息模板:使用`RabbitTemplate`...
如果你使用Spring Boot,集成RabbitMQ变得更简单。只需在`application.properties`或`yaml`文件中配置RabbitMQ的相关属性,Spring Boot会自动配置所需的bean。 总的来说,通过Spring与RabbitMQ的集成,我们可以方便...
将RabbitMQ与Spring整合,可以更好地管理和处理消息传递。 本示例代码基于Java和Maven构建,展示了如何在Spring项目中集成RabbitMQ。以下是集成过程的关键步骤和知识点: 1. **依赖管理**:首先,在Maven的`pom....
在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要知识点是利用Spring Boot框架与RabbitMQ集成,动态配置消费者,并实现实时监听消息。 1. **Spring Boot与RabbitMQ集成**: Spring Boot简化了...