- 浏览: 853904 次
文章分类
- 全部博客 (365)
- java (124)
- spring mvc (21)
- spring (22)
- struts2 (6)
- jquery (27)
- javascript (24)
- mybatis/ibatis (8)
- hibernate (7)
- compass (11)
- lucene (26)
- flex (0)
- actionscript (0)
- webservice (8)
- rabbitMQ/Socket (15)
- jsp/freemaker (5)
- 数据库 (27)
- 应用服务器 (21)
- Hadoop (1)
- PowerDesigner (3)
- EJB (0)
- JPA (0)
- PHP (2)
- C# (0)
- .NET (0)
- html (2)
- xml (5)
- android (7)
- flume (1)
- zookeeper (0)
- 证书加密 (2)
- maven (1)
- redis (2)
- cas (11)
最新评论
-
zuxianghuang:
通过pom上传报错 Artifact upload faile ...
nexus上传了jar包.通过maven引用当前jar,不能取得jar的依赖 -
流年末年:
百度网盘的挂了吧???
SSO单点登录系列3:cas-server端配置认证方式实践(数据源+自定义java类认证) -
953434367:
UfgovDBUtil 是什么类
Java发HTTP POST请求(内容为xml格式) -
smilease:
帮大忙了,非常感谢
freemaker自动生成源代码 -
syd505:
十分感谢作者无私的分享,仔细阅读后很多地方得以解惑。
Nginx 反向代理、负载均衡、页面缓存、URL重写及读写分离详解
到http://github.com/momania/spring-rabbitmq下载其示例程序
实行远程接口调用,主要在com.rabbitmq.spring.remoting下几个类:
发布服务端(Server):RabbitInvokerServiceExporter.java
接口调用客户端(Client):RabbitInvokerProxyFactoryBean.java,RabbitInvokerClientInterceptor.java,
发布服务端(Server)——RabbitInvokerServiceExporter.java说明:
package com.rabbitmq.spring.remoting;
import static java.lang.String.format;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationBasedExporter;
import org.springframework.remoting.support.RemoteInvocationResult;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.spring.ExchangeType;
import com.rabbitmq.spring.InvalidRoutingKeyException;
import com.rabbitmq.spring.channel.RabbitChannelFactory;
public class RabbitInvokerServiceExporter extends RemoteInvocationBasedExporter
implements InitializingBean, DisposableBean, ShutdownListener {
private final Log log = LogFactory
.getLog(RabbitInvokerServiceExporter.class);
private RabbitChannelFactory channelFactory;
private String exchange;
private ExchangeType exchangeType;
private String queueName;
private String routingKey;
private Object proxy;
private List<RpcServer> rpcServerPool;
private int poolsize = 1;
public void afterPropertiesSet() {
// 检查exchange type类型不能为fanout
if (exchangeType.equals(ExchangeType.FANOUT)) {
throw new InvalidRoutingKeyException(String.format(
"Exchange type %s not allowed for service exporter",
exchangeType));
}
exchangeType.validateRoutingKey(routingKey);
// 调用org.springframework.remoting.support.RemoteExporter的getProxyForService(),得到代理对象
proxy = getProxyForService();
// 初始化rpcServer池
rpcServerPool = new ArrayList<RpcServer>(poolsize);
// 初始化RpcServer,并开始接收请求
startRpcServer();
}
// 初始化RpcServer,并开始接收请求
private void startRpcServer() {
try {
log.info("Creating channel and rpc server");
// 创建临时的channel,用来定义queue,exchange,并进行bind
// 这里有两个用处:
// 1:在服务端也定义queue,避免因为先开服务端而出现queue没被定义的错误
// 2:这里先用一个channel定义一下qeueue,后面的for循环里面就不用每个都去定义了
Channel tmpChannel = channelFactory.createChannel();
tmpChannel.getConnection().addShutdownListener(this);
tmpChannel.queueDeclare(queueName, false, false, false, true, null);
if (exchange != null) {
tmpChannel.exchangeDeclare(exchange, exchangeType.toString());
tmpChannel.queueBind(queueName, exchange, routingKey);
}
// 创建poolsize个RpcServer,每个RpcServer使用一个单独的channel,并且分别使用单独的线程去接收请求,提升接收速度
for (int i = 1; i <= poolsize; i++) {
try {
// 每次都创建一个新的channel,因为一个channel在多个线程中使用是会有问题的(官方文档和channel的JavaDoc上是这样说的)
Channel channel = channelFactory.createChannel();
String format = "Starting rpc server %d on exchange [%s(%s)] - queue [%s] - routingKey [%s]";
log.info(String.format(format, i, exchange, exchangeType,
queueName, routingKey));
// 使用当前的channel创建一个RpcServer去处理请求
final RpcServer rpcServer = createRpcServer(channel);
rpcServerPool.add(rpcServer);
// 创建一个线程让当前的RpcServer去处理请求
Runnable main = new Runnable() {
@Override
public void run() {
try {
// rpcServer开始处理请求
throw rpcServer.mainloop();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
// 线程开始
new Thread(main).start();
} catch (IOException e) {
log.warn("Unable to create rpc server", e);
}
}
} catch (Exception e) {
log.error("Unexpected error trying to start rpc servers", e);
}
}
// 创建RpcServer对象
private RpcServer createRpcServer(Channel channel) throws IOException {
return new RpcServer(channel, queueName) {
// 重写处理接收到的消息的方法
public byte[] handleCall(byte[] requestBody,
AMQP.BasicProperties replyProperties) {
// 因为在客户端调用方法的时候,是将客户端调用的方法的信息封装成一个RemoteInvocation对象,然后序列化成一个byte数据再使用RpcClient发送到服务端的
// 所以在这里(服务端接收消息),将消息(requestBody)反序列化成RemoteInvocation对象
RemoteInvocation invocation = (RemoteInvocation) SerializationUtils
.deserialize(requestBody);
// 根据RemoteInvocation的信息,服务端使用代理对象执行相应的方法,并得到执行结果
RemoteInvocationResult result = invokeAndCreateResult(
invocation, proxy);
// 将执行结果序列化为byte数据,然后返回给客户端
return SerializationUtils.serialize(result);
}
};
}
public void setChannelFactory(RabbitChannelFactory channelFactory) {
this.channelFactory = channelFactory;
}
@Required
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public Object getProxy() {
return proxy;
}
@Override
public void destroy() throws Exception {
clearRpcServers();
}
// 清除所有的RpcServer
private void clearRpcServers() {
if (log.isInfoEnabled()) {
log.info(format("Closing %d rpc servers", rpcServerPool.size()));
}
for (RpcServer rpcServer : rpcServerPool) {
try {
// 中止处理请求
rpcServer.terminateMainloop();
rpcServer.close();
} catch (Exception e) {
log.warn("Error termination rpcserver loop", e);
}
}
rpcServerPool.clear();
if (log.isInfoEnabled()) {
log.info("Rpc servers closed");
}
}
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
if (log.isInfoEnabled()) {
log.info(String.format("Channel connection lost for reason [%s]",
cause.getReason()));
log.info(String.format("Reference [%s]", cause.getReference()));
}
if (cause.isInitiatedByApplication()) {
if (log.isInfoEnabled()) {
log.info("Sutdown initiated by application");
}
} else if (cause.isHardError()) {
log
.error("Shutdown is a hard error, trying to restart the RPC server...");
startRpcServer();
}
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
@Required
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public void setPoolsize(int poolsize) {
this.poolsize = poolsize;
}
@Required
public void setExchangeType(ExchangeType exchangeType) {
this.exchangeType = exchangeType;
}
}
发表评论
-
rabbitmq 学习-13- 发送接收消息示例-2
2012-07-06 17:17 1757Basic RPC As a programming con ... -
rabbitmq 学习-12- 发送接收消息示例-1
2012-07-06 17:17 14457这里是同步发送消息,异步接收消息接收有两种方式:http:// ... -
rabbitmq 学习-积累
2012-07-06 17:17 13441,temporary queue(由server自动命名)在 ... -
rabbitmq 学习-11- 几个发送接收消息的重要类
2012-07-06 17:17 16761,ChannelbasicPublish() 用来发送消息, ... -
rabbitmq 学习-10-channel 说明
2012-07-06 17:17 5285rabbitmq java api 关于消息处理的一个重要的类 ... -
rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
2012-06-30 16:44 3059本身使用RpcClient发送消息与同步接收消息的代码是很 ... -
rabbitmq 学习-8- Exchange Queue RoutingKey关系说明
2012-06-30 16:44 3121String queue = channel.queueDec ... -
rabbitmq 学习-7-rabbitmq 支持场景
2012-06-30 16:43 1270What messaging scenarios are su ... -
rabbitmq 学习-6-rabbitmq基础
2012-06-30 16:43 1753rabbitmq的中文资料真少,和同事lucas经过两周的 ... -
rabbitmq 学习-4-初试2
2012-06-29 14:32 1079RpcClient,RpcServer同步发送接收消息Chan ... -
rabbitmq 学习-3-初试1
2012-06-29 14:32 1064本例是一个简单的异步 ... -
rabbitmq 学习-1-AMQP介绍
2012-06-29 14:30 1393Windows 1,下载下载erlang:erlang. ... -
rabbitmq学习1:hello world
2012-06-29 14:27 1372rabbitMQ是一个在AMQP基础上完整的,可服用的企业消息 ... -
rabbitmq操作命令
2012-06-14 13:54 201581.必需掌握的指令 添加用户: ...
相关推荐
NULL 博文链接:https://wubin850219.iteye.com/blog/1076093
2. **环境变量**:安装过程中可能需要设置环境变量,例如添加RabbitMQ的bin目录到PATH变量中,以便于命令行调用相关工具。 3. **服务启动与管理**:安装完成后,RabbitMQ服务将作为Windows服务运行。用户可以通过...
3. 安装RabbitMQ:按照官方指南进行安装,并确保服务器启动并运行。 4. 配置RabbitMQ:可以通过管理界面或者配置文件来设置用户、虚拟主机和权限。 接下来,我们学习RabbitMQ的队列。队列是RabbitMQ的核心概念,它...
RabbitMQ和Kafka是消息中间件,面试中可能讨论它们的使用场景、特性以及如何在Java中集成。Zookeeper在分布式系统中的角色,如配置管理、服务发现等,也是考察点。 【网络编程】 网络编程的基础,如TCP/IP协议、...
RabbitMQ作为一款流行的开源消息中间件,广泛应用于Spring Boot项目中。本教程将详细介绍如何在Spring Boot应用中结合RabbitMQ实现消息确认、消息持久化、备用交换机以及死信交换机等功能。 首先,让我们理解这些...
这个项目可能还涉及到微服务间的通信,如使用RabbitMQ或Kafka作为消息中间件,或者利用Spring Cloud Stream进行消息传递。此外,可能会有Dockerfile和docker-compose.yml文件,用于将应用容器化部署。 总之,...
本项目"spring-cloud-learning:Springcloud + rabbitmq + kafka + slueth + zipkin + lcn + redis"是针对Spring Cloud的一系列实践教程,涉及到的关键技术包括: 1. **Spring Cloud Eureka**:注册与发现服务,它是...
2.5 RPC模式:远程过程调用,通过RabbitMQ实现客户端与服务端的异步通信。 三、RabbitMQ实战应用 3.1 Spring整合:使用Spring框架集成RabbitMQ,包括配置、生产者和消费者组件的创建,以及消息模板的使用。 3.2 ...
在微服务之间进行远程调用时,通常有两种方式:RESTful API和Feign。RESTful API是通过HTTP协议进行调用,而Feign是Spring Cloud提供的声明式Web服务客户端,它使得编写Web服务客户端变得非常简单,只需要定义一个...
1. **spring-rabbit-1.5.1.RELEASE.jar**:这是Spring对RabbitMQ的官方支持模块,它提供了与RabbitMQ集成的API和配置。这个库包含了Spring AMQP项目的核心功能,如连接工厂、模板类、监听容器等,使得开发者能够方便...
当我们结合Spring与RabbitMQ时,可以创建一个高效且灵活的异步通信和远程过程调用(RPC)系统。下面,我们将深入探讨"spring-rabbitmq-demo"项目中的关键知识点。 首先,`pom.xml`文件是Maven项目的核心配置文件,...
Sleuth Stream 模块则利用 Spring Cloud Stream 实现服务间的通信,通过消息代理(如 RabbitMQ)传递追踪数据,确保高可用性和可伸缩性。 2. **MyBatis**: MyBatis 是一个优秀的持久层框架,它支持自定义 SQL、存储...
标题 "springCloud+rabbitMq" 暗示了这篇内容是关于如何在Spring Cloud框架中集成并使用RabbitMQ的教程。Spring Cloud是微服务架构中的一个组件集合,提供了服务发现、配置管理、断路器等核心功能,而RabbitMQ则是一...
它允许开发者以类似接口的方式定义服务调用,降低了远程调用的复杂性。 6. **Config Server**:Spring Cloud Config 用于实现分布式系统中的配置管理,支持 Git 存储配置,允许服务在运行时动态获取或更新配置。 7...
本文将深入探讨如何使用Spring、Spring MVC和RabbitMQ进行集成,以及它们在实际项目中的应用。 首先,Spring AMQP是Spring框架的一个模块,它提供了一种与AMQP(Advanced Message Queuing Protocol)兼容的抽象层,...
总结一下,Spring RabbitMQ RPC的核心在于利用RabbitMQ作为中间人,通过定义交换机、队列和绑定,实现在客户端和服务端之间进行异步的远程调用。这种方式可以很好地扩展系统,同时保持组件间的解耦。通过配置和编程...
为了验证项目是否正常运行,我们可以创建一个简单的REST接口,通过调用`MessageProducer`发送消息。在`Controller`类中添加以下代码: ```java import org.springframework.beans.factory.annotation.Autowired; ...
基于Vue+SpringCloud博客的设计与实现---微服务基础版本组件1.0版本 博客采用Vue+SpringCloud前后分离的方式。博客采用了高可用Eureka(可以替换成其他微服务组件)以及高可用Zuul,使用以Es搜索引擎作为Zpkin的存储...
标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...