在项目中引入RabbitMQ通常会考虑它会带来的好处:解耦应用程序,实现不同编程语言之间的互通,解除对特定通信协议的依赖,解除应用程序在时序上执行的依赖(异步).落实到代码层面就是两种常用应用模式:"发后即忘"(fire-and-forget)和RPC.
fire-and-forget
RabbitMQ解决的是应用程序之间互联(connect)和规模(scale)的问题,消息发送和接收是隔离,发送方不知道消息最终由谁接收,接收方也不必关心消息是谁步发出的;发送和接收是隔离的,消息本质上就是异步的.这种隔离也就解耦了应用程序之间的依赖.RabbitMQ的角色就是应用程序中间的路由器.对于消息的发布方来讲这是一种"发后即忘"(fire-and_forget)的发布方式.
fire-and-forget模式发送消息,消息的发送方和接收方彼此隔离
RPC
RPC需要双向通信,或者说RPC Server需要明确知道要把消息发送给谁.我们可以在payload的数据部分附加 "发给谁" 这种EndPoint信息. RabbitMQ提供的解决方案:在每一个AMQP的消息头上有一个reply_to字段.这样消息的producer就可以指定Queue name,RPC Server接受到消息检查reply_to字段,创建一个消息包含Response并把queue name作为routing key,订阅了这个队列的Client就拿到了消息.
这里有两件事情要保证:
1.要为队列创建随机Name
2.即使Name随机还是有可能冲突,还需要保证消息通信的独占性,看看RabbitMQ是怎么满足这两点的:
(1)如果创建的队列不指定queue name,rabbitMQ就会创建一个随机的Name
(2)独占只需要exclusive参数即可
总而言之,需要做的就是Client创建一个temporary,exclusive,anonymou的queue,并把queue name设置在RPC消息的reply_to字段即可.注意这里RPC Server已经知道要投递到哪个Queue,所以不需要指Exchange
RabbitMq RPC与传统RPC区别
传统的RPC调用Client和Server紧密依赖,客户端连接上服务器,发送一个请求然后阻塞等待服务器响应.这样的做的特点是客户端和服务器端是知道对方的.如果RPC Server崩溃掉,客户端需要重连,如果Server彻底崩掉就要重新找一个提供同样服务的Server,然后客户端重连过去.
用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态.
总结一下,使用RabbitMQ是先RPC,客观上还实现了下面的效果:
- 容错 一个Server崩溃不影响 Client
- 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
- 在多个RPC Server之间的负载均衡由RabbitMQ完成
生产者代码
public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (Exception ignore) {} } } } }
消费者代码
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(),"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e){ System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } }
相关推荐
【标题】:“rabbitMQ入门” 在IT行业中,消息队列是一种常见的中间件技术,用于解耦应用程序组件,提高系统的可扩展性和可靠性。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中。本篇文章将带...
【RabbitMQ入门操作手册】提供了全面的RabbitMQ学习指南,从基础概念到实际操作,帮助初学者快速掌握这个强大的消息队列系统。RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理,其核心...
最后,"远程过程调用"(RPC)章节讲述的是如何使用RabbitMQ实现基于消息的RPC。在这种模式下,客户端发送请求消息,然后等待服务器返回响应。RabbitMQ提供了一种可靠的机制来确保请求和响应的对应关系,这对于分布式...
使用安装依赖项: composer install消费消息通过消费者下载消息: php app/console rabbitmq:consumer bar启动 RPC 服务器要使用 RPC,您需要启动服务器: php app/console rabbitmq:rpc-server bar运行和测试要运行...
这是RabbitMQ入门的经典示例,它展示了最基础的消息发布与消费过程。生产者发送一个简单的"Hello, World!"消息到RabbitMQ服务器,然后消费者从队列中取出并打印这个消息。这个例子帮助我们理解RabbitMQ的基本工作...
对于初学者来说,首先需要了解什么是消息队列,然后是安装和配置RabbitMQ,接着能够编写入门级别的程序,最后能够熟练掌握RabbitMQ的5种模式特征,并能够将SpringBoot与RabbitMQ进行整合。 消息队列的实现方式主要...
Dubbo是阿里巴巴开源的一个高性能、轻量级的Java RPC框架,它提供了丰富的服务治理功能,如服务注册与发现、调用链跟踪、负载均衡、容错机制等。在本项目中,Dubbo可能被用于实现微服务间的远程调用,使得系统模块...
【RabbitMQ基础入门】 RabbitMQ是一款广泛应用的消息中间件,基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中实现可靠的消息传递。它允许应用程序之间解耦,提高系统的灵活性和可扩展性。...
- **适用场景**:适用于大型分布式系统的RPC服务治理。 #### 2. Dubbo 面试常见问题 - 包括但不限于服务注册与发现机制、服务路由策略、负载均衡算法等内容。 ### 六、Zookeeper 技术要点 #### 1. Zookeeper ...
一旦安装了两个项目,首先按照尼克的指示启动rabbitmq-proto。 您需要在 config 目录中创建一个 default.js 配置文件(出于安全原因,这是 gitignore 的),其中包含以下行并填充了值: exports.rpc_user = ''; /...
3-6 Docker入门基础文档.mp4 3-6 本章小结.mp4 3-7 Ubuntu中通过Docker安装配置MySQL主从节点.mp4 第04章 “云存储”系统之基于用户系统实现的资源隔离及鉴权 4-1 帐号系统介绍与用户表设计.mp4 4-2 编码实战:...
它旨在用于构建事件驱动的应用程序,启用事件源,消息之上的RPC,sagas以及您想到的其他任何东西。 您可以使用传统的pub / sub实现,例如Kafka或RabbitMQ,也可以使用HTTP或MySQL binlog(如果适合您的用例)。目标...
README.md文件是项目的介绍文档,通常包含项目概述、安装指南、使用示例等,是了解项目快速入门的关键。最后,“micro”文件可能是可执行文件,用于在命令行中运行和管理go-micro的服务。 go-micro框架的核心特性...
NATS,NSQ,RabbitMQ,Kafka 客户RPC客户端; gRPC,HTTP 编解码器消息编码; 水星BSON 微微型工具包插件登记处服务发现; Etcd,八卦,NATS 选择器负载均衡; 标签,缓存,静态服务器RPC服务器; gRPC,HTTP 运输...
Netty 与 RPC 网络 日志 Zookeeper Kafka RabbitMQ 数据库 一致性算法 JAVA 算法 Spark 集合 多线程并发 设计模式 负载均衡 数据结构 加密算法 分布式缓存 机器学习 云计算 JVM Hbase MongoDB Cassandra ...
python入门到高级全栈工程师培训视频学习资料;本资料仅用于学习,请查看后24小时之内删除。 【课程内容】 第1章 01 计算机发展史 02 计算机系统 03 小结 04 数据的概念 05 进制转换 06 原码补码反码 07 物理层和...
【描述】中的内容表明,这个压缩包提供了丰富的实例代码,覆盖了多个功能模块,包括基础的"Hello World"入门项目,文件上传与下载功能,MongoDB数据库的集成,邮件服务,RabbitMQ消息中间件的使用,以及Shiro安全...
在微服务的自动发现与负载均衡方面,文档讲述了Kubernetes的快速入门和在微服务中的应用,强调了服务发现机制对于服务之间高效交互的重要性。通过服务发现,微服务可以在运行时动态地定位其他服务,而负载均衡则确保...