spring大家太熟,就不多说了
rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684
本文侧重介绍如何将rabbitmq整合到项目中
ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
1.首先是生产者配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:context = "http://www.springframework.org/schema/context"
xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
< rabbit:connection-factory id = "connectionFactory" host = "localhost" username = "guest"
password = "guest" port = "5672" />
< rabbit:admin connection-factory = "connectionFactory" />
<!-- queue 队列声明-->
< rabbit:queue id = "queue_one" durable = "true" auto-delete = "false" exclusive = "false" name = "queue_one" />
<!-- exchange queue binging key 绑定 -->
< rabbit:direct-exchange name = "my-mq-exchange" durable = "true" auto-delete = "false" id="my-mq-exchange<span></ span >">
< rabbit:bindings >
< rabbit:binding queue = "queue_one" key = "queue_one_key" />
</ rabbit:bindings >
</ rabbit:direct-exchange >
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
< bean id = "jsonMessageConverter" class = "mq.convert.FastJsonMessageConverter" ></ bean >
<-- spring template声明-->
< rabbit:template exchange = "my-mq-exchange" id = "amqpTemplate" connection-factory = "connectionFactory" message-converter = "jsonMessageConverter" />
</ beans >
|
2.fastjson messageconver插件实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
// 这里本人换成fastjson的jsonobject实现
import fe.json.FastJson;
public class FastJsonMessageConverter extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter. class );
public static final String DEFAULT_CHARSET = "UTF-8" ;
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {
super ();
//init();
}
public void setDefaultCharset(String defaultCharset) {
this .defaultCharset = (defaultCharset != null ) ? defaultCharset
: DEFAULT_CHARSET;
}
public Object fromMessage(Message message)
throws MessageConversionException {
return null ;
}
public <T> T fromMessage(Message message,T t) {
String json = "" ;
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}
protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte [] bytes = null ;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes( this .defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content" , e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding( this .defaultCharset);
if (bytes != null ) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
} |
3.生产者端调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import java.util.List;
import org.springframework.amqp.core.AmqpTemplate;
public class MyMqGatway {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend( "queue_one_key" , obj);
}
} |
4.消费者端配置(与生产者端大同小异)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:context = "http://www.springframework.org/schema/context"
xmlns:rabbit = "http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
< rabbit:connection-factory id = "connectionFactory" host = "localhost" username = "guest"
password = "guest" port = "5672" />
< rabbit:admin connection-factory = "connectionFactory" />
<!-- queue 队列声明-->
< rabbit:queue id = "queue_one" durable = "true" auto-delete = "false" exclusive = "false" name = "queue_one" />
<!-- exchange queue binging key 绑定 -->
< rabbit:direct-exchange name = "my-mq-exchange" durable = "true" auto-delete = "false" id = "my-mq-exchange" >
< rabbit:bindings >
< rabbit:binding queue = "queue_one" key = "queue_one_key" />
</ rabbit:bindings >
</ rabbit:direct-exchange >
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 taskExecutor这个需要自己实现一个连接池 按照官方说法 除非特别大的数据量 一般不需要连接池-->
< rabbit:listener-container connection-factory = "connectionFactory" acknowledge = "auto" task-executor = "taskExecutor" >
< rabbit:listener queues = "queue_one" ref = "queueOneLitener" />
</ rabbit:listener-container >
</ beans >
|
5.消费者端调用
1
2
3
4
5
6
7
8
9
|
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueOneLitener implements MessageListener{
@Override
public void onMessage(Message message) {
System.out.println( " data :" + message.getBody());
}
} |
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
相关推荐
Spring boot整合消息队列RabbitMQ
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
将RabbitMQ与Spring整合,可以方便地在Spring应用中使用消息队列,实现异步通信和任务调度。 本实例主要介绍如何在Spring应用中集成RabbitMQ,构建一个完整的消息传递系统。首先,你需要确保已经安装了RabbitMQ...
70、秒杀系统高并发之消息队列RabbitMQ和代码编写中,主要介绍了如何在系统中引入RabbitMQ,以及如何编写相应的代码来发布和消费消息。生产者通常是在用户发起秒杀请求时,将请求信息封装成消息并发送到RabbitMQ的...
本文将详细介绍如何整合Spring与RabbitMQ,以实现高效的消息传递。 首先,我们要理解Spring对RabbitMQ的支持主要体现在Spring AMQP项目中,它为RabbitMQ提供了一套高级抽象层,使得开发者能够更加便捷地使用...
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
在"spring整合rabbitmq需要的jar包(spring版本4.2.0)"中,提到了几个核心的库文件,它们分别是: 1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和...
消息队列RabbitMQ是一种广泛使用的开源消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中实现可靠的消息传递。在面试中,掌握RabbitMQ的相关知识是至关重要的,因为它是现代...
springcloud bus rabbitmq 分布式队列 http://knight-black-bob.iteye.com/blog/2356839
1. 引入依赖:在项目中添加RabbitMQ的Spring整合依赖,如`spring-amqp`库。 2. 配置RabbitMQ:在Spring的配置文件中,定义连接工厂、信道配置以及RabbitMQ服务器的相关属性。 3. 创建消息模板:使用`RabbitTemplate`...
8. **Spring整合**:在Java应用中,Spring框架提供了与RabbitMQ集成的模块,方便开发者在Spring应用中轻松地使用消息队列。 9. **监控与管理**:RabbitMQ提供了Web管理界面,可以查看服务器状态、队列信息、消息...
将RabbitMQ与Spring整合,可以更好地管理和处理消息传递。 本示例代码基于Java和Maven构建,展示了如何在Spring项目中集成RabbitMQ。以下是集成过程的关键步骤和知识点: 1. **依赖管理**:首先,在Maven的`pom....
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...
在Spring Cloud框架中,整合RabbitMQ或Kafka作为消息驱动是常见的微服务间通信方式。这两种技术都是流行的消息中间件,用于实现异步处理、解耦和扩展性。下面将详细阐述它们在Spring Cloud中的应用。 首先,...
"Spring整合RabbitMQ实现商品数据同步" Spring框架整合RabbitMQ实现商品数据同步是一个非常重要的知识点。在本文中,我们将详细介绍如何使用Spring框架整合RabbitMQ实现商品数据同步。 首先,让我们了解什么是...