RPC
/* Routing Model /->(Request repQueue=123 cid=abc) [...](request Queue) \ RPCClient RPCServer <-\(Reply correlation_id=abc) [...](reply Queue:123) / */
远程过程调用的应用场景也很广,我们来看看如何用rabbitmq实现rpc编程
RemoteService rs = new RemoteService(); String result = rs.speak("2012"); //block until the answer is received System.out.println("Goodbye " + result);
关于RPC的一点看法,尽管它是一种常用的编程模型,却饱受批评,开发人员没法知道自己调用的程序是本地程序还是远程程序,可能会造成很多困惑,增加调试的复杂程度。(可以考虑使用异步流水线替代阻塞调用)
callback queue
客户端将消息和响应队列信息一并发给服务器端
RPCClient.java
package test; import java.io.IOException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RPCClient { private Connection connection; private Channel channel; public RPCClient() throws IOException{ // 创建一个连接连接服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public void close() throws IOException{ channel.close(); connection.close(); } public String call(String message) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { String response; String replyQueueName = channel.queueDeclare().getQueue(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", "request_queue", props, message.getBytes()); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if(delivery.getProperties().getCorrelationId().equals(corrId)){ response = new String(delivery.getBody()); break; } } return response; } public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { RPCClient client = new RPCClient(); String res = new String(client.call("test")); System.out.println(res); client.close(); } }
RPCServer.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // 创建一个连接接收数据 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("request_queue", false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("request_queue", false, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).replyTo(props.getCorrelationId()).build(); String response = new String("Hello" + new String(delivery.getBody())); channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
分别运行RPCClient和RPCServer,可以得到结果hellotest
相关推荐
本篇文章将深入探讨如何使用Java实现基于RabbitMQ的RPC。 首先,我们要理解RabbitMQ的基本概念。RabbitMQ是一个开源的消息代理,它遵循AMQP(Advanced Message Queuing Protocol)协议,负责接收和转发消息。在RPC...
Spring RabbitMQ RPC(远程过程调用)是一种使用RabbitMQ实现客户端与服务器间通信的方式,它允许客户端发送请求到服务器,然后服务器处理请求并返回结果。在这个场景中,RabbitMQ作为一个消息中间件,帮助解耦应用...
学习RabbitMQ的学习笔记
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
RabbitMQ RPC(远程过程调用)是一种利用消息队列系统实现客户端与服务端之间异步通信的技术。RabbitMQ,作为一款开源的消息代理和队列服务器,广泛应用于分布式系统中,用于解耦组件间的直接依赖,提高系统的可靠性...
本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...
第11周-第10章节-Python3.5-RabbitMQ rpc实现.mp4
本文将深入探讨 `rabbitmq-rpc-consumer-go`,这是一个专门用于RabbitMQ RPC的Go语言模块,主要关注其作为消费者的实现。 在Go语言中,`rabbitmq-rpc-consumer-go` 模块为开发者提供了方便的方式来创建RabbitMQ的...
本项目是一款基于Java语言的RabbitMQ学习与实践设计源码,共计78个文件,涵盖29个Java源文件、14个Markdown文档、8个XML配置文件、5个属性文件、4个Git忽略规则文件、4个命令行脚本文件、3个JAR包文件、2个PNG图片...
在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许...在这个简单的demo中,我们学习了如何创建连接、声明交换机和队列、发送和接收消息,这些都是RabbitMQ的基础操作,为更高级的用法打下了基础。
首先,`MQSubscribeService.java`代表的是订阅者服务,它是接收和处理来自RabbitMQ的消息的组件。在封装订阅者线程时,通常会包括以下关键知识点: 1. **创建连接和通道**:使用`ConnectionFactory`创建与RabbitMQ...
它涵盖了连接管理、消息发送与接收的基本流程,对于学习和理解RabbitMQ在Java环境下的应用非常有帮助。在实际开发中,你可以根据项目需求调整这些示例,例如增加错误处理、消息确认机制、使用工作队列模型等,以实现...
rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...
【Java使用RabbitMQ服务】 RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中的消息传递。本文将简要介绍如何在Java环境中使用RabbitMQ,包括安装、基本结构、消息发送模式以及高级特性。 ### 1. 安装 在...
在代码实现上,通常会使用RabbitMQ的客户端库,如Java的`rabbitmq-client`,Python的`pika`等,它们提供了方便的API来创建连接、通道、声明交换器和队列,以及发送和接收消息。 理解并掌握这些RPC实现方式有助于...
在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于高效地处理异步任务和解耦系统组件。`RabbitmqUtil` 是一个专门为Java开发者设计的工具类,...
Java版本的RabbitMQ实例是...这个Java版本的RabbitMQ实例是学习如何在Java项目中集成消息队列的宝贵资源。通过它,开发者可以掌握RabbitMQ的核心概念,以及如何在实际场景中利用这些概念构建健壮的、可扩展的应用程序。