`
阅读更多

    本文简单记录一下 spring 整合 rabbitmq,此处引入spring boot是为了方便引入和rabbitmq整合相关的jar包,并没有使用spring boot整合 rabbitmq。

 

实现功能

  1. 完成 spring 和 rabbitmq 的整合
  2. 完成使用 rabbitAdmin 创建队列等
  3. 完成使用 @Bean 注解声明队列等
  4. 完成使用 RabbitTemplate 进行发送消息
  5. 使用 SimpleMessageListenerContainer 进行消息的监听,可以对消息进行各种适配等

整合步骤:

1、引入 jar 包。

2、配置 ConnectionFacotry。

3、配置 RabbitAdmin 方便维护队列、交换器、绑定等。

4、配置 RabbitTemplate 方便程序中的消息的发送和接收等。

5、配置 SimpleMessageListenerContainer 方便程序中消息的监听。

 

小知识点:

1、在程序中进行队列、交换器、绑定的声明和使用 @Bean 注解来进行声明,如果要想在程序启动的时候自动创建这些队列等,那么在配置 RabbitAdmin 的时候需要将  autoStartup 属性设置成 true。

2、如果我们在程序运行的过程中需要动态修改 监听的队列或移除队列等,那么可以使用  SimpleMessageListenerContainer 来动态修改这些参数。

3、在如果我们想对接收到RabbitMQ发送的消息进行适配和消息转换等,那么使用SimpleMessageListenerContainer这个可以使用。

 

实现步骤:

1、引入 spring 整合 rabbitmq 整合的 maven 依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、spring 整合 rabbitmq 的核心配置类

/**
 * rabbitmq 配置
 *
 * @author huan.fu
 * @date 2018/10/17 - 10:57
 */
@Configuration
@Slf4j
public class RabbitmqConfiguration {

	/**
	 * 创建 rabbitmq 连接工厂
	 *
	 * @return
	 */
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
		connectionFactory.setHost("140.143.237.224");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("root");
		connectionFactory.setPassword("root");
		connectionFactory.setVirtualHost("/");
		return connectionFactory;
	}

	/**
	 * rabbitmq 实现 AMQP 便携式的管理操作,比如创建队列、绑定、交换器等
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

	/**
	 * rabbit mq 模板
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		return new RabbitTemplate(connectionFactory);
	}

	/**
	 * 消息监听容器
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
		SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
		// 设置监听的队列
		simpleMessageListenerContainer.setQueueNames("queue001", "queue002");
		// 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
		simpleMessageListenerContainer.setConcurrentConsumers(3);
		// 最大的并发消费者
		simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
		// 设置是否重回队列
		simpleMessageListenerContainer.setDefaultRequeueRejected(false);
		// 设置签收模式
		simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		// 设置非独占模式
		simpleMessageListenerContainer.setExclusive(false);
		// 设置consumer未被 ack 的消息个数
		simpleMessageListenerContainer.setPrefetchCount(1);
		// 接收到消息的后置处理
		simpleMessageListenerContainer.setAfterReceivePostProcessors((MessagePostProcessor) message -> {
			message.getMessageProperties().getHeaders().put("接收到消息后", "在消息消费之前的一个后置处理");
			return message;
		});
		// 设置 consumer 的 tag
		simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
			private AtomicInteger consumer = new AtomicInteger(1);

			@Override
			public String createConsumerTag(String queue) {
				return String.format("consumer:%s:%d", queue, consumer.getAndIncrement());
			}
		});
		// 设置消息监听器
		simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
			try {
				log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			} catch (Exception e) {
				log.error(e.getMessage(), e);
				// 发生异常此处需要捕获到
				channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
			}
		});

		/**  ================ 消息转换器的用法 ================
		 simpleMessageListenerContainer.setMessageConverter(new MessageConverter() {
		 // 将 java 对象转换成 Message 对象
		 @Override public Message toMessage(Object object, MessageProperties messageProperties) {
		 return null;
		 }

		 // 将 message 对象转换成 java 对象
		 @Override public Object fromMessage(Message message) {
		 return null;
		 }
		 });
		 */

		/**  ================ 消息适配器的用法,用于处理各种不同的消息 ================
		 MessageListenerAdapter adapter = new MessageListenerAdapter();
		 // 设置真正处理消息的对象,可以是一个普通的java对象,也可以是 ChannelAwareMessageListener 等
		 adapter.setDelegate(null);
		 adapter.setDefaultListenerMethod("设置上一步中delegate对象中处理的方法名");

		 ContentTypeDelegatingMessageConverter converters = new ContentTypeDelegatingMessageConverter();
		 // 文本装换器
		 MessageConverter txtMessageConvert = null;
		 // json 转换器
		 MessageConverter jsonMessageConvert = null;

		 converters.addDelegate("text", txtMessageConvert);
		 converters.addDelegate("html/text", txtMessageConvert);
		 converters.addDelegate("text/plain", txtMessageConvert);

		 converters.addDelegate("json", jsonMessageConvert);
		 converters.addDelegate("json/*", jsonMessageConvert);
		 converters.addDelegate("application/json", jsonMessageConvert);

		 adapter.setMessageConverter(converters);
		 simpleMessageListenerContainer.setMessageListener(adapter);

		 */
		return simpleMessageListenerContainer;
	}


	@Bean
	public Queue queue003() {
		return new Queue("queue003", false, false, false, null);
	}

	@Bean
	public Exchange exchange003() {
		return new TopicExchange("exchange003", false, false, null);
	}

	@Bean
	public Binding binding003() {
		return new Binding("queue003", Binding.DestinationType.QUEUE, "exchange003", "save.*", null);
	}

}

 3、动态移除和增加对队列的监听

/**
 * 测试动态改变 SimpleMessageListenerContainer 的属性,比如动态增加需要监听的队列等
 *
 * @author huan.fu
 * @date 2018/10/17 - 15:12
 */
@Component
@Slf4j
public class DynamicSimpleMessageListenerContainerTest implements InitializingBean {

	@Autowired
	private ApplicationContext applicationContext;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(10);
				SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
				log.info("移除对队列:[{}]的监听", "queue001");
				simpleMessageListenerContainer.removeQueueNames("queue001");
				TimeUnit.SECONDS.sleep(5);
				log.info("添加对队列:[{}]的监听", "queue001");
				String[] queueNames = simpleMessageListenerContainer.getQueueNames();
				Arrays.copyOf(queueNames, queueNames.length + 1);
				queueNames[queueNames.length - 1] = "queue001";
				simpleMessageListenerContainer.addQueueNames(queueNames);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
	}
}

 4、使用 RabbitAdmin 创建队列等

/**
 * 测试 rabbitAdmin
 *
 * @author huan.fu
 * @date 2018/10/17 - 12:54
 */
@Component
@Slf4j
public class RabbitAdminService implements InitializingBean {

	@Autowired
	private RabbitAdmin rabbitAdmin;

	/**
	 * 创建队列
	 *
	 * @param queueName
	 */
	public void createQueue(String queueName) {
		log.info("创建队列:[{}]", queueName);
		rabbitAdmin.declareQueue(new Queue(queueName, false, false, false, null));
	}

	/**
	 * 创建direct交换器
	 *
	 * @param exchangeName
	 */
	public void createDirectExchange(String exchangeName) {
		log.info("创建direct交换器:[{}]", exchangeName);
		rabbitAdmin.declareExchange(new DirectExchange(exchangeName, false, false, null));
	}

	/**
	 * 创建topic交换器
	 *
	 * @param exchangeName
	 */
	public void createTopicExchange(String exchangeName) {
		log.info("创建topic交换器:[{}]", exchangeName);
		rabbitAdmin.declareExchange(new TopicExchange(exchangeName, false, false, null));
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		createQueue("queue001");
		createQueue("queue002");
		createDirectExchange("exchange001");
		createTopicExchange("exchange002");
		// 创建绑定
		rabbitAdmin.declareBinding(new Binding("queue001", Binding.DestinationType.QUEUE, "exchange001", "direct_001", null));
		// 创建绑定
		rabbitAdmin.declareBinding(new Binding("queue002", Binding.DestinationType.QUEUE, "exchange002", "topic.save.#", null));
	}
}

 5、使用 RabbitTemplate 进行发送消息

/**
 * RabbitTemplate测试
 *
 * @author huan.fu
 * @date 2018/10/17 - 14:35
 */
@Component
@Slf4j
public class RabbitTemplateTest implements InitializingBean {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(5);
				IntStream.rangeClosed(1, 10).forEach(num -> rabbitTemplate.convertAndSend("exchange001", "direct_001", String.format("这个是第[%d]条消息.", num)));
			} catch (InterruptedException e) {
				log.error(e.getMessage(), e);
			}
		}).start();
	}
}

 6、启动类

/**
 * spring 整合 rabbitmq
 *
 * @author huan.fu
 * @date 2018/10/17 - 10:53
 */
@SpringBootApplication
public class RabbitMqApplication {
	public static void main(String[] args) {
		SpringApplication.run(RabbitMqApplication.class, args);
	}
}

 7、运行结果
 

程序代码

代码: https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-spring

  • 大小: 172.6 KB
分享到:
评论

相关推荐

    spring整合rabbitmq的实例

    本文将详细介绍如何整合Spring与RabbitMQ,以实现高效的消息传递。 首先,我们要理解Spring对RabbitMQ的支持主要体现在Spring AMQP项目中,它为RabbitMQ提供了一套高级抽象层,使得开发者能够更加便捷地使用...

    spring整合rabbitmq需要的jar包(spring版本4.2.0)

    在"spring整合rabbitmq需要的jar包(spring版本4.2.0)"中,提到了几个核心的库文件,它们分别是: 1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和...

    Spring整合RabbitMQ

    Spring集成RabbitMQ还支持异步处理,可以通过`SimpleMessageListenerContainer`配置启用。同时,Spring提供了事务支持,确保消息在处理过程中的一致性。 ### 5. 性能优化 为了提高性能,可以使用`...

    spring集成rabbitmq 通俗易懂的demo

    首先,让我们理解Spring集成RabbitMQ的基本步骤: 1. **添加依赖**:在你的`pom.xml`或`build.gradle`文件中,你需要添加Spring AMQP和RabbitMQ客户端库的依赖。对于Maven用户,可以添加如下依赖: ```xml ...

    spring集成rabbitmq实现rpc

    本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...

    RabbitMq与Spring整合实例

    本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ服务器,并且能够正常运行。RabbitMQ通常通过AMQP(Advanced Message Queuing Protocol)协议进行...

    RabbitMQ入门到进阶(Spring整合RabbitMQ&amp;SpringBoot整合RabbitMQ).doc

    RabbitMQ入门到进阶(Spring整合RabbitMQ&amp;SpringBoot整合RabbitMQ).doc

    rabbitmq spring demo

    rabbitmq spring rabbitmq spring rabbitmq spring rabbitmq spring http://knight-black-bob.iteye.com/blog/2304089

    Spring整合RabbitMQ完整项目源码.rar

    此源码是在前人Demo的基础上自己改造的,主要讲了spring和rabbitMQ的整合与使用。其中包含两个项目,一个Producer,一个Consumer,能够分别运行。Producer生产并发送消息到RabbitMQ,Consumer从RabbitMQ接收并处理...

    spring集成rabbitmq最初始的SSM项目信息

    **SSM集成RabbitMQ最初始的项目信息详解** 在Java Web开发中,Spring、Struts和MyBatis(简称SSM)是一个常见的框架组合,它们分别负责控制层、表现层和数据持久化层。而RabbitMQ则是一个强大的消息中间件,常用于...

    介绍Spring Cloud Stream与RabbitMQ集成

    介绍Spring Cloud Stream与RabbitMQ集成的代码示例。Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。

    spring集成rabbitMq(基于direct、topic和fanout模式)

    通过上述步骤,你可以在Spring应用程序中成功集成RabbitMQ,并利用direct、topic和fanout交换器实现高效的消息路由。在实际项目中,根据业务需求选择合适的交换器模式,可以提高系统的可扩展性和可靠性。

    spring集成rabbitmq并实现两个系统间的通信

    本文将详细探讨如何在Spring项目中集成RabbitMQ,并实现两个系统间的通信。 首先,我们需要在项目中添加RabbitMQ的相关依赖。在Maven的pom.xml文件中,我们需要引入Spring AMQP和RabbitMQ的客户端库: ```xml ...

    基于spring集成rabbitMq(基于direct、topic和fanout模式)源码.rar

    在这个基于Spring集成RabbitMQ的源码示例中,我们主要会关注三种不同的交换机类型:direct、topic和fanout。 1. **Direct Exchange(直连交换机)**: 直连交换机是最简单的模型,它将消息路由到绑定键与发布时...

    RabbitMQ与SpringMVC集成

    1. 引入依赖:在项目中添加RabbitMQ的Spring整合依赖,如`spring-amqp`库。 2. 配置RabbitMQ:在Spring的配置文件中,定义连接工厂、信道配置以及RabbitMQ服务器的相关属性。 3. 创建消息模板:使用`RabbitTemplate`...

    RabbitMq 集成Spring

    如果你使用Spring Boot,集成RabbitMQ变得更简单。只需在`application.properties`或`yaml`文件中配置RabbitMQ的相关属性,Spring Boot会自动配置所需的bean。 总的来说,通过Spring与RabbitMQ的集成,我们可以方便...

    RabbitMQ整合spring示例代码(java maven)

    将RabbitMQ与Spring整合,可以更好地管理和处理消息传递。 本示例代码基于Java和Maven构建,展示了如何在Spring项目中集成RabbitMQ。以下是集成过程的关键步骤和知识点: 1. **依赖管理**:首先,在Maven的`pom....

    java rabbitmq动态注册,监听实现

    在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要知识点是利用Spring Boot框架与RabbitMQ集成,动态配置消费者,并实现实时监听消息。 1. **Spring Boot与RabbitMQ集成**: Spring Boot简化了...

Global site tag (gtag.js) - Google Analytics