一、RabbitMQ 简介
官网:http://www.rabbitmq.com/
rabbitMQ 是 AMQP 用 Erlang 实现的 MQ AMQP 主要是由金融领域的软件专家们贡献的创意,而联合了通讯和软件方面的力量,一起打造出来的规范。只要遵循 AMQP 的协议,任何一种语言都可以开发消息组件乃至中间件本身。我们之前使用的activeMQ是实现了jms接口,只能在java环境使用。
参考博文:http://langyu.iteye.com/blog/759663
AMQP协议是一种二进制协议,提供客户端应用与消息中间件之间异步、安全、高效地交互。从整体来看,AMQP协议可划分为三层:
消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。在AMQP中提供类似功能的两种域模型:Exchange 和 Message queue。
二、基本概念
如上图所示rabbitmq主要包括四部分组成
1,P代表生产者,C 代表消费者,X exchange 交换器,Q红色的表示队列
2,P和C都是在客户端,X和Q在服务器端。
3,发送接收的大概流程是,P发送消息至交换器。
4,C先声明一个队列,然后将队列和交换器绑定,接着接受消息。
5,换句话说,P和C互相不知道对方存在。
三、RabbitMQ Server 安装
1,下载地址:http://www.rabbitmq.com/download.html ,支持多种操作系统:
* Windows: With installer (recommended) | Manual
* Linux / Unix: Debian / Ubuntu | Fedora / RHEL | Generic Unix | Solaris
* Mac OS X: Standalone | Generic Unix | Macports | Homebrew
* Cloud: EC2
三、RabbitMQ原生代码实例
1,交换器类型:direct, topic, headers和 fanout
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout
所有bind到此exchange的queue都可以接收消息
direct
通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic
所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
Headers
类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。
参考博文:http://www.cnblogs.com/haoxinyue/archive/2012/09/28/2707041.html
实例内容:点对点,工作队列,订阅/发布,路由,主题,RPC
官网实例:http://www.rabbitmq.com/getstarted.html
中文翻译:http://adamlu.net/dev/2011/09/rabbitmq-get-started/
四,spring-amqp
官网:http://projects.spring.io/spring-amqp/
Spring AMQP 是基于 Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO。
maven引用:
<dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.2.0.RELEASE</version> </dependency></dependencies>
实例:http://docs.spring.io/spring-amqp/reference/html/sample-apps.html
五、tbss与spring整合的日志记录模块
1,虚拟机安装了rabbitMQ,ip:192.168.17.130 默认端口:5672
2,log项目结构,如下图:
图1:原来的facade接口,对外供webservice的接口;
图2:增加基于rabbitmq队列的日志消费者服务项目
图3:集成单元测试:主要是LogProducter与spring中的log-rabbit.xml配置为模拟日志生产者
3,采用rabbitmq重构的目的
3.1 建立星形的中心处理结构,解决原来webservice的网状部署分布;
3.2 日志服务利用rabbitmq buffer定义可进行持久,对于日志记录(消费者)状态不关心,即透明,不会因为日志记录服务器宕机而导致主业务回滚或者日志丢失;
4,源代码分析
4.1 log-server
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>log-parent</artifactId>
<groupId>com.yolema.log</groupId>
<version>1.0.0.20120828</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>log-server</artifactId>
...
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
...
</project>
LogConsumer.java
package com.yolema.log.server;
import com.yolema.log.core.biz.manager.LoginLogManager;
import com.yolema.log.core.biz.manager.OperateLogManager;
import com.yolema.log.core.model.entity.LoginLog;
import com.yolema.log.core.model.entity.OperateLog;
import com.yolema.log.core.model.enums.LogResultEnum;
import com.yolema.log.core.model.exceptions.LogException;
import com.yolema.log.core.model.results.GenericsResult;
import com.yolema.log.core.vo.operate.LogQueryVo;
import com.yolema.log.ext.facade.order.LogQueryOrder;
import com.yolema.log.ext.facade.results.LogExtResult;
import com.yolema.log.ext.facade.sdo.LoginLogSdo;
import com.yolema.log.ext.facade.sdo.OperateLogSdo;
import com.youlema.tools.jee.beanutils.BeanGeneralConvertor;
import com.youlema.tools.jee.converter.BeanConverter;
import com.youlema.tools.jee.pages.PageList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 日志记录消费者
* <p/>
* User: sys53
* Date: 13-12-4 下午4:03
* version $Id: LogProducter.java, v 0.1 Exp $
*/
public class LogConsumer {
private final static Logger LOG = LoggerFactory.getLogger(LogConsumer.class);
@Autowired
private LoginLogManager loginLogManager;
@Autowired
private OperateLogManager operateLogManager;
/**
* 记录一条登录日志
*
* @return
*/
public void recordLoginLog(LoginLogSdo loginLogSdo) {
try {
if (loginLogSdo == null) {
LOG.info("记录登录日志参数不能为null");
throw new LogException(LogResultEnum.ARGS_CAN_NOT_BE_NULL);
}
GenericsResult<LoginLog> result = loginLogManager.insert(BeanConverter.convert(
new LoginLog(), loginLogSdo));
if (!result.isSuccess()) {
throw new LogException(result.getResultCode(), result.getResultMsg());
}
} catch (LogException e) {
LOG.error("记录登录日志失败,出现已知异常", e);
} catch (Exception e) {
LOG.error("记录登录日志失败,出现未知异常", e);
}
}
/**
* 记录一条操作日志
*/
public void recordOperateLog(OperateLogSdo operateLogSdo) {
try {
if (operateLogSdo == null) {
LOG.info("记录操作日志参数不能为null");
throw new LogException(LogResultEnum.ARGS_CAN_NOT_BE_NULL);
}
GenericsResult<OperateLog> result = operateLogManager.insert(BeanConverter.convert(
new OperateLog(), operateLogSdo));
if (!result.isSuccess()) {
throw new LogException(result.getResultCode(), result.getResultMsg());
}
} catch (LogException e) {
LOG.error("记录操作日志失败,出现已知异常", e);
} catch (Exception e) {
LOG.error("记录操作日志失败,出现未知异常", e);
}
}
/**
* 查询操作日志列表
* @param logQueryOrder
* @return
*/
public LogExtResult queryOperateLog(LogQueryOrder logQueryOrder){
LogExtResult extResult = new LogExtResult(true);
try {
GenericsResult<PageList<OperateLog>> result = operateLogManager
.queryPageList(BeanConverter.convert(new LogQueryVo(),
logQueryOrder));
if (!result.isSuccess()) {
throw new LogException(result.getResultCode(), result.getResultMsg());
}
extResult.setOperateLogList(BeanGeneralConvertor.convertBeanList2BeanList(
OperateLogSdo.class, result.getResultData()));
} catch (LogException e) {
LOG.error("查询操作日志失败,出现已知异常", e);
extResult.setSuccess(false);
extResult.setResultCode(e.getResultCode());
extResult.setResultMsg(e.getResultMsg());
} catch (Exception e) {
LOG.error("查询操作日志失败,出现未知异常", e);
extResult.setSuccess(false);
extResult.setResultMsg(e.getMessage());
}
return extResult;
}
}
log-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="192.168.17.130" username="guest" password="guest"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="logExchange" routing-key="log.service"/>
<rabbit:queue name="log.queue" />
<rabbit:topic-exchange name="logExchange">
<rabbit:bindings>
<rabbit:binding queue="log.queue" pattern="log.*" />
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="logConsumer" method="recordLoginLog" queue-names="log.queue" />
<rabbit:listener ref="logConsumer" method="recordOperateLog" queue-names="log.queue" />
<rabbit:listener ref="logConsumer" method="queryOperateLog" queue-names="log.queue" />
</rabbit:listener-container>
<bean id="logConsumer" class="com.yolema.log.server.LogConsumer" />
</beans>
在spring配置文件中定义由于主题交换类的监听设置
1,rabbit:connection-factory,定义:192.168.17.130 的connectionFactory
2,rabbit:template,定义rabbitTemplate,通过connectionFactory,交换机名为:logExchange,routing-key:log.service
3,定义队列名:log.queue
4,rabbit:topic-exchange 定义交机器,并rabbit:binding绑定在队列:log.queue,匹配所有routing-key是:log.开头的
5,bean id="logConsumer” 定义普通javabean,定义委托监听的执行方法
6,rabbit:listener-container定义在connectionFactory上的监听容器,rabbit:listener定义队列og.queue来的消息采用哪一个委托类方法;
(需要测试的问题,如果同一个队列中,不同的方法,参数是一样的话,可能会同时执行)
LogServer.java
package com.yolema.log.server;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 日志服务
* <p/>
* User: sys53
* Date: 13-12-4 下午4:02
* version $Id: LogServer.java, v 0.1 Exp $
*/
public class LogServer {
public static void main(String[] args){
new ClassPathXmlApplicationContext("classpath:/log-*.xml");
}
}
直接调用spring架框就可以正常启动rabbitMQ监听容器
4.2生产者(junit单元测试代码)
LogProducter.java
package com.yolema.log.test.integration;
import com.yolema.log.ext.facade.order.LogQueryOrder;
import com.yolema.log.ext.facade.results.LogExtResult;
import com.yolema.log.ext.facade.sdo.LoginLogSdo;
import com.yolema.log.ext.facade.sdo.OperateLogSdo;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
/**
* Write class comments here
* <p/>
* User: sys53
* Date: 13-12-4 下午5:12
* version $Id: com.yolema.log.test.integration.LogProducter.java, v 0.1 Exp $
*/
public class LogProducter extends BaseTest{
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void recordLoginLog(){
LoginLogSdo sdo = new LoginLogSdo();
sdo.setClientArea("杭州");
sdo.setClientBrowser("ie");
sdo.setClientIp("127.0.0.1");
sdo.setClientOs("xp");
sdo.setLogContent("登录成功");
sdo.setLogTarget("2");
sdo.setLoginType("1");
sdo.setLogTime(new Date());
sdo.setLogResult("SUCCESS");
amqpTemplate.convertAndSend(sdo);
}
@Test
public void recordOperateLog(){
OperateLogSdo sdo = new OperateLogSdo();
sdo.setLogTarget("tbss");
sdo.setLogTime(new Date());
sdo.setLogLvl("1");
sdo.setUrl("http://localhost:8080/tbss");
sdo.setLogContent("rabbitmq测试增加用户");
sdo.setSystemCode("tbss-code");
sdo.setSystemName("tbss-all");
sdo.setModelCode("user");
sdo.setModelName("用户");
sdo.setClientIp("192.168.0.252");
sdo.setClientBrowser("IE");
sdo.setClientArea("杭州");
sdo.setClientOs("mac os x");
amqpTemplate.convertAndSend(sdo);
}
@Test
public void queryOperateLog(){
LogQueryOrder logQueryOrder = new LogQueryOrder();
LogExtResult logExtResult =(LogExtResult) amqpTemplate.convertSendAndReceive(logQueryOrder);
Assert.assertTrue(logExtResult.isSuccess());
System.out.println(logExtResult.getOperateLogList().size());
}
}
1,直接承继BaseTest的集成测试架构(默认定义spring配置)
2,定义三个测试类,发送三个不同参数
2.1recordLoginLog() 发送:amqpTemplate.convertAndSend(sdo)参数为LoginLogSdo 类
2.2recordOperateLog()发送:amqpTemplate.convertAndSend(sdo),参数为OperateLogSdo 类
2.3queryOperateLog(),同步等待队列返回结果amqpTemplate.convertSendAndReceive(logQueryOrder),参数LogQueryOrder 类
log-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="192.168.17.130" username="guest" password="guest"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" queue="log.queue" exchange="logExchange" routing-key="log.service"/>
</beans>
rabbit:template 定义amqpTemplate,在LogProducter 中可以自动注入,connectionFactory同消费者端
六、TBSS系统中非核心业务的架构
分享到:
相关推荐
【基于微服务系统的数据可视化展示平台软件V11】是一款专为微服务架构设计的工具,旨在解决微服务系统中运维、迭代开发和模块管理的挑战。该软件以IDEA插件的形式存在,依赖于IDEA进行安装和使用。通过微服务的...
9. **RabbitMQ/ActiveMQ**:消息队列,用于异步处理和解耦系统组件。 10. **Git**:版本控制系统,用于代码管理和协作。 这些技术和工具的组合,构建出了一套完整的云端企业级应用解决方案,旨在提供高效、稳定、可...
7. 性能优化:通过缓存策略、数据库索引优化、代码重构等手段,提升系统整体性能,保证在高并发情况下也能稳定运行。 8. 日志管理:使用Log4j或Logback等日志框架,记录系统运行日志,便于故障排查和系统维护。 综...
这个压缩包文件"rabbitmq-rpc-master"似乎是一个基于Java实现的RPC框架,它经过了重构,旨在提高性能和可维护性。以下是对该框架关键特性的详细解释: 1. **Java反射机制**:Java反射是Java语言的一个强大功能,它...
同时,系统可以引入消息队列(如RabbitMQ或Kafka)来解耦服务间的同步调用,提高系统的响应能力和容错能力。 在功能模块上,微服务架构允许团队根据业务领域划分服务,例如支付服务、会员服务、订单服务等,每个...
在Exam++考试系统第一版的基础上,我们对ExamStack V2.0进行了大量代码重构,同时也对数据模型做了部分调整。为了减小学员考试交卷时大量并发带来的系统风险,我们尝试采用成熟的消息队列框架RabbitMQ来解决这一问题...
《基于SpringBoot 2.6.4的电商系统源码深度解析》 SpringBoot作为现代Java开发中的一个关键框架,以其简洁、快速的特性...通过对各个模块的拆解、重构和优化,可以提升系统的稳定性和效率,满足不断增长的业务需求。
【描述】该系统基于Spring Boot技术进行重构,Spring Boot简化了Spring应用的初始搭建以及开发过程,它集成了大量常用的第三方库配置,如JPA、Thymeleaf、RabbitMQ等,使得开发者可以快速地构建一个独立运行的应用。...
IntelliJ IDEA是JetBrains公司推出的Java开发集成环境,以其强大的代码自动补全、重构功能和丰富的插件支持而广受开发者喜爱。在开发过程中,IDEA提供了一流的编码辅助,包括智能代码提示、错误检测、快速修复等,极...
5. **RabbitMQ**:另一个消息队列系统,RabbitMQ允许应用之间解耦通信,提高系统的稳定性和可维护性。在pear-api中,它同样可以用于处理后台任务和批量数据操作。 **文件名称列表分析:** 提供的文件名 "pear-api-...
1、基于SpringBoot+Vue搭建的多功能体育场地智能管理系统源码+数据库+项目说明.zip 2、该资源包括项目的全部源码,下载可以直接使用! 3、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...
传统应用通常基于三层架构,依赖于物理或虚拟服务器,采用垂直扩展方式,即通过增强单个服务器的硬件能力来应对负载增长。它们往往具有紧密耦合的服务,数据备份和恢复过程复杂,且对基础设施的依赖性强。故障发生时...
2. **消息队列**:如RabbitMQ或Kafka,用于处理异步通信和解耦各个服务,降低系统间的依赖,提高系统的响应速度。 3. **WebSocket**:作为实时通信的技术基础,WebSocket提供双向通信,使得服务器可以主动推送消息...
当然,您还可以继续使用 wmq 和 wmq-adminWmq-Admin根据 WMQ 服务提供的后台管理系统实现了对 WMQ 服务的用户,节点,消息,消费,日志的统一操作管理WMQ基于 RabbitMQ 开发的消息队列服务,支持 http 协议fork:环境...
7. **其他选型**:还包括消息队列系统(如RabbitMQ、Kafka),分布式文件系统(如HDFS),站内搜索引擎(如Elasticsearch)等。 **总结** 构建LAMP架构需根据网站规模和需求逐步升级,选择合适的开源软件组合,...
该项目是一个基于IDEA集成开发环境,使用Maven作为构建工具,结合SSM(Spring、SpringMVC、MyBatis)框架以及MySQL数据库实现的高并发商品秒杀系统。以下是这个项目涉及的关键技术点: 1. **IDEA**: IntelliJ IDEA ...
- **高可维护性**:确保架构模块和服务易于替换,便于升级或重构。 #### 负载均衡器的选择与比较 负载均衡器的选择取决于具体的场景和需求。常见的选项包括: - **LVS**:工作在四层,性能非常高但配置较为复杂。...
3. **消息队列**:RabbitMQ或Kafka,用于解耦服务间的依赖,提高系统的响应性和扩展性,如异步处理订单支付、发送通知等。 4. **Docker与Kubernetes**:为了实现微服务的容器化部署和集群管理,项目可能包含...