https://my.oschina.net/never/blog/140368
spring大家太熟,就不多说了
rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684
本文侧重介绍如何将rabbitmq整合到项目中
ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
1.首先是生产者配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
password="guest" port="5672" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- queue 队列声明-->
<rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
<!-- exchange queue binging key 绑定 -->
<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="queue_one" key="queue_one_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter" class="mq.convert.FastJsonMessageConverter"></bean>
<-- spring template声明-->
<rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
</beans>
2.fastjson messageconver插件实现
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import fe.json.FastJson;
public class FastJsonMessageConverter extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {
super();
//init();
}
public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}
public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}
public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
}
3.生产者端调用
import java.util.List;
import org.springframework.amqp.core.AmqpTemplate;
public class MyMqGatway {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
}
4.消费者端配置(与生产者端大同小异)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
password="guest" port="5672" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- queue 队列声明-->
<rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
<!-- exchange queue binging key 绑定 -->
<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="queue_one" key="queue_one_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
<rabbit:listener queues="queue_one" ref="queueOneLitener"/>
</rabbit:listener-container>
</beans>
5.消费者端调用
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueOneLitener implements MessageListener{
@Override
public void onMessage(Message message) {
System.out.println(" data :" + message.getBody());
}
}
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
分享到:
相关推荐
Spring boot整合消息队列RabbitMQ
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
将RabbitMQ与Spring整合,可以方便地在Spring应用中使用消息队列,实现异步通信和任务调度。 本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ...
接着,71、秒杀系统高并发之RabbitMQ和spring整合部分,讲解了如何将RabbitMQ与Spring框架集成,以实现更高效、更灵活的消息处理。Spring框架提供了对RabbitMQ的全面支持,包括创建连接工厂、定义消息模板、设置...
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
消息队列RabbitMQ是Java开发者在面试中经常会遇到的技术话题,它在分布式系统中扮演着重要的角色。这里,我们将会深入探讨RabbitMQ的基本概念、核心特性以及常见面试问题。RabbitMQ是一个开源的消息代理和队列服务器...
本文将详细介绍如何整合Spring与RabbitMQ,以实现高效的消息传递。 首先,我们要理解Spring对RabbitMQ的支持主要体现在Spring AMQP项目中,它为RabbitMQ提供了一套高级抽象层,使得开发者能够更加便捷地使用...
首先,让我们了解Spring Boot如何集成RabbitMQ。在Spring Boot项目中引入`spring-boot-starter-amqp`依赖,它包含了对RabbitMQ的支持。接着,配置RabbitMQ的相关参数,如主机地址、端口、用户名和密码。在`...
springcloud bus rabbitmq 分布式队列 http://knight-black-bob.iteye.com/blog/2356839
1. 引入依赖:在项目中添加RabbitMQ的Spring整合依赖,如`spring-amqp`库。 2. 配置RabbitMQ:在Spring的配置文件中,定义连接工厂、信道配置以及RabbitMQ服务器的相关属性。 3. 创建消息模板:使用`RabbitTemplate`...
8. **Spring整合**:在Java应用中,Spring框架提供了与RabbitMQ集成的模块,方便开发者在Spring应用中轻松地使用消息队列。 9. **监控与管理**:RabbitMQ提供了Web管理界面,可以查看服务器状态、队列信息、消息...
通过以上步骤,你就成功地在 Spring Boot 项目中集成了 RabbitMQ,并实现了消息的发送和接收。在实际项目中,还可以根据业务需求,使用更多的 RabbitMQ 功能,如工作队列、发布/订阅模式、路由键策略等,以优化系统...
在"spring整合rabbitmq需要的jar包(spring版本4.2.0)"中,提到了几个核心的库文件,它们分别是: 1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和...
通过Spring AMQP,我们可以利用Spring的依赖注入和配置能力来管理RabbitMQ的连接和信道,简化了消息系统的集成。 在"spring+springmvc+rabbitmq实例代码"中,你可能会看到以下几个关键组件: 1. **RabbitTemplate*...
RabbitMQ则是一个广泛使用的开源消息代理和队列服务器,它允许应用程序之间进行异步通信,实现了消息的可靠传递。本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,...
在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...
该项目为基于Spring Boot框架的Java语言开发的RabbitMQ消息队列设计与实现源码,包含43个文件,包括20个Java源文件、5个PNG图片、4个CSS样式表、3个JavaScript脚本、2个映射文件、1个Git忽略规则文件、1个Markdown...
2. **Spring框架**:Spring框架提供了丰富的支持来集成各种消息队列,如RabbitMQ、ActiveMQ、Kafka等。Spring的`@EnableJms`注解可以开启JMS(Java Message Service)支持,`JmsTemplate`则为发送和接收消息提供了...