`
yimeng528
  • 浏览: 188289 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq学习6:RPC

阅读更多

在《rabbitmq学习2:Work Queues 》中我们已经知道了在多个worker如何分配耗时的任务。如果我现在要在远程的机器上运行然后得到结果,那应当怎么做呢?那就要用到RPC(Remote Procedure Call or RPC )了!

   关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726

   现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:


   上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:

1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

 

  对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:

English代码  收藏代码
  1. Message properties  
  2. The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:  
  3.   
  4. delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.   
  5. content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.   
  6. reply_to: Commonly used to name a callback queue.   
  7. correlation_id: Useful to correlate RPC responses with requests.   

 

 delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;   

  content_type : 用来描述MIME的类型。如把其类型设定为JSON;

  reply_to : 用于命名一个回调Queue;

  correlation_id : 用于与相关联的请求的RPC响应.

  现在我们就开始RPC的程序吧!

client的代码如下:

Java代码  收藏代码
  1. package com.abin.rabbitmq;  
  2.   
  3. import java.util.UUID;  
  4.   
  5. import com.rabbitmq.client.AMQP.BasicProperties;  
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9. import com.rabbitmq.client.QueueingConsumer;  
  10.   
  11. public class RPCClient {  
  12.     private Connection connection;  
  13.     private Channel channel;  
  14.     private String requestQueueName = "rpc_queue";  
  15.     private String replyQueueName;  
  16.     private QueueingConsumer consumer;  
  17.   
  18.     public RPCClient() throws Exception {  
  19.         ConnectionFactory factory = new ConnectionFactory();  
  20.         factory.setHost("localhost");  
  21.         connection = factory.newConnection();  
  22.         channel = connection.createChannel();  
  23.   
  24.         replyQueueName = channel.queueDeclare().getQueue();  
  25.         consumer = new QueueingConsumer(channel);  
  26.         channel.basicConsume(replyQueueName, true, consumer);  
  27.     }  
  28.   
  29.     public String call(String message) throws Exception {  
  30.         String response = null;  
  31.         String corrId = UUID.randomUUID().toString();  
  32.   
  33.         BasicProperties props = new BasicProperties();  
  34.         props.setReplyTo(replyQueueName);  
  35.         props.setCorrelationId(corrId);  
  36.   
  37.         channel.basicPublish("", requestQueueName, props, message.getBytes());  
  38.   
  39.         while (true) {  
  40.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  41.             if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
  42.                 response = new String(delivery.getBody(), "UTF-8");  
  43.                 break;  
  44.             }  
  45.         }  
  46.   
  47.         return response;  
  48.     }  
  49.   
  50.     public void close() throws Exception {  
  51.         connection.close();  
  52.     }  
  53.   
  54.     public static void main(String[] argv) {  
  55.         RPCClient fibonacciRpc = null;  
  56.         String response = null;  
  57.         try {  
  58.             fibonacciRpc = new RPCClient();  
  59.   
  60.             System.out.println(" [x] Requesting fib(30)");  
  61.             response = fibonacciRpc.call("30");  
  62.             System.out.println(" [.] Got '" + response + "'");  
  63.             System.out.println(" [x] Requesting fib(-1)");  
  64.             response = fibonacciRpc.call("-1");  
  65.             System.out.println(" [.] Got '" + response + "'");  
  66.             System.out.println(" [x] Requesting fib(a)");  
  67.             response = fibonacciRpc.call("a");  
  68.             System.out.println(" [.] Got '" + response + "'");  
  69.         } catch (Exception e) {  
  70.             e.printStackTrace();  
  71.         } finally {  
  72.             if (fibonacciRpc != null) {  
  73.                 try {  
  74.                     fibonacciRpc.close();  
  75.                 } catch (Exception ignore) {  
  76.                 }  
  77.             }  
  78.         }  
  79.     }  
  80. }  

 

 server的代码如下:

Java代码  收藏代码
  1. package com.abin.rabbitmq;  
  2.   
  3. import com.rabbitmq.client.AMQP.BasicProperties;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8.   
  9. public class RPCServer {  
  10.     private static final String RPC_QUEUE_NAME = "rpc_queue";  
  11.   
  12.     private static int fib(int n) {  
  13.         if (n > 1)  
  14.             return fib(n - 1) + fib(n - 2);  
  15.         else  
  16.             return n;  
  17.     }  
  18.   
  19.     public static void main(String[] argv) {  
  20.         Connection connection = null;  
  21.         Channel channel = null;  
  22.         try {  
  23.             ConnectionFactory factory = new ConnectionFactory();  
  24.             factory.setHost("localhost");  
  25.   
  26.             connection = factory.newConnection();  
  27.             channel = connection.createChannel();  
  28.   
  29.             channel.queueDeclare(RPC_QUEUE_NAME, falsefalsefalsenull);  
  30.   
  31.             channel.basicQos(1);  
  32.   
  33.             QueueingConsumer consumer = new QueueingConsumer(channel);  
  34.             channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
  35.   
  36.             System.out.println(" [x] Awaiting RPC requests");  
  37.   
  38.             while (true) {  
  39.                 String response = null;  
  40.   
  41.                 QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  42.   
  43.                 BasicProperties props = delivery.getProperties();  
  44.                 BasicProperties replyProps = new BasicProperties();  
  45.                 replyProps.setCorrelationId(props.getCorrelationId());  
  46.   
  47.                 try {  
  48.                     String message = new String(delivery.getBody(), "UTF-8");  
  49.                     int n = Integer.parseInt(message);  
  50.   
  51.                     System.out.println(" [.] fib(" + message + ")");  
  52.                     response = "" + fib(n);  
  53.                 } catch (Exception e) {  
  54.                     System.out.println(" [.] " + e.toString());  
  55.                     response = "";  
  56.                 } finally {  
  57.                     channel.basicPublish("", props.getReplyTo(), replyProps,  
  58.                             response.getBytes("UTF-8"));  
  59.   
  60.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),  
  61.                             false);  
  62.                 }  
  63.             }  
  64.         } catch (Exception e) {  
  65.             e.printStackTrace();  
  66.         } finally {  
  67.             if (connection != null) {  
  68.                 try {  
  69.                     connection.close();  
  70.                 } catch (Exception ignore) {  
  71.                 }  
  72.             }  
  73.         }  
  74.     }  
  75. }  

 

先运行服务器端,运行结果如下:

Java代码  收藏代码
  1. [x] Awaiting RPC requests  

   再运行运行客户端,运行结果如下:

Java代码  收藏代码
  1. [x] Requesting fib(30)  
  2. [.] Got '832040'  
  3. [x] Requesting fib(-1)  
  4. [.] Got '-1'  
  5. [x] Requesting fib(a)  
  6. [.] Got ''  

   在服务器还可以出现:

Java代码  收藏代码
  1. [.] fib(30)  
  2. [.] fib(-1)  
  3. [.] java.lang.NumberFormatException: For input string: "a"  

 

 

 

分享到:
评论

相关推荐

    rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用

    在本主题"rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用"中,我们将深入探讨如何使用RabbitMQ作为消息中间件,结合Spring-AMQP库实现RPC模式。 RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP...

    java队列源码-rabbitmq-repository:RabbitMQ消息队列学习的源码记录

    rabbitmq-java-rpc rabbitmq PRC通信示例 rabbitmq-spring-helloworld spring boot 使用rabbitmq的第一个demo rabbitmq-spring-work-queue spring boot使用rabbitmq的队列示例 rabbitmq-spring-fanout spring boot...

    rabbitMQ开发教程-中文翻译

    在实际应用中,RabbitMQ能够支持多种消息模式和交换类型,包括工作队列(Work queues)、发布/订阅(Publish/Subscribe)、路由(Routing)、Topics类型交换机和远程过程调用(RPC)等,这些模式在RabbitMQ中通过...

    RabbitMQ学习案例Demo

    **RabbitMQ学习案例Demo详解** RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于解耦应用程序,提高系统的可扩展性和可靠性。在本...

    RabbitMq学习笔记1

    在本篇学习笔记中,我们将首先了解RabbitMQ的安装过程。 1. **Erlang的安装**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编写的。安装Erlang可以通过以下命令完成: ```bash sudo apt-get install ...

    使用RabbitMQ实现RPC

    作为中间件实现的RPC模式,希望对您的学习有所帮助。RabbitMQ RabbitMQ是基于AMQP协议实现的一个消息队列(MessageQueue),Message Queue是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和...

    rabbitmq.net 各种实例

    RabbitMQ 是一个开源的消息队列系统,基于 AMQP(Advanced Message Queuing Protocol)协议实现,广泛用于...通过实例学习和实践,我们可以灵活地利用RabbitMQ解决各种消息通信问题,提高系统的可靠性和可扩展性。

    RabbitMQ培训PPT

    4. **RPC(远程过程调用)**: 通过RabbitMQ实现客户端和服务器之间的请求-响应模式。 这个PPT培训材料详细介绍了RabbitMQ的各个方面,不仅涵盖了基础概念,还深入到了实践应用和高级特性,对于理解并掌握RabbitMQ...

    dubbo+rabbitmq+springMvc+maven简单demo

    Dubbo是阿里巴巴开源的一个高性能、轻量级的Java RPC框架,它提供了丰富的服务治理功能,如服务注册与发现、调用链跟踪、负载均衡、容错机制等。在本项目中,Dubbo可能被用于实现微服务间的远程调用,使得系统模块...

    rabbitmq和erl集合最新版.zip

    7. **工作模型**:RabbitMQ支持多种工作模型,如发布/订阅、点对点通信、RPC(远程过程调用)等。 接下来,我们要谈谈Erlang语言在RabbitMQ中的作用: 1. **并发性**:Erlang的轻量级进程和强大的并发机制使得...

    rabbitMq入门

    在实际应用中,`RPCServer.java`和`RPCClient.java`的实现会涉及到RabbitMQ的Java客户端库(如`rabbitmq-client`),包括创建连接工厂、创建通道、声明队列、发送和接收消息等操作。 了解了基础概念后,你可以深入...

    rpc-xiuyuan.rar

    深入研究这个示例,我们可以学习到如何设计和实现RPC服务,包括服务的注册与发现、服务调用的流程、错误处理策略以及性能优化等方面的知识。这对于理解分布式系统的工作原理和提升系统开发能力具有重要意义。

    基于RabbitMQ的远程主机管理系统

    基于RabbitMQ rpc实现的主机管理 可以对指定机器异步的执行多个命令 例子: >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 task id: 45334 >>: check_task 45334 >>: 注意,每执行一条命令,即立刻生成一个任务ID,...

    rabbitmq面试题.pdf

    **6. 解释RabbitMQ中的消息确认机制。** - **确认机制:** 消费者接收到消息后会向RabbitMQ发送确认信号,确保消息被正确处理。 - **手动确认:** 消费者显式地告知RabbitMQ已成功处理消息。 - **自动确认:** 消费...

    rpc远程调用实例,包含源代码以及实验报告

    在这个实例中,我们将会探讨RPC的基本原理、实现方式以及如何通过源代码和实验报告来理解和学习RPC。 首先,我们要理解RPC的核心概念。RPC调用的流程通常包括以下步骤:客户端发起请求,服务器接收到请求后执行相应...

    rabbitMQ实战

    ### RabbitMQ实战知识点详解 #### 一、RabbitMQ简介及背景 - **RabbitMQ开发语言**: Erlang,一种面向并发的编程...通过本文的学习,您可以掌握RabbitMQ的基本使用方法和核心概念,为后续深入学习打下坚实的基础。

    rabbitMQ 消息队列 Demo

    在"RabbitMQTest"这个压缩包中,可能包含了上述所有示例的源代码,你可以通过运行这些代码来实际操作并学习RabbitMQ的各种功能。了解并熟练掌握这些示例,将有助于你更好地在实际项目中运用RabbitMQ,实现高效、可靠...

    rabbitmq.net各种实例

    通过学习这些实例,开发者可以深入理解RabbitMQ在.NET环境中的用法,掌握如何在自己的项目中使用RabbitMQ进行高效的消息通信,实现任务队列、解耦系统组件、处理高并发等情况,提升系统的稳定性和可扩展性。...

    rabbitmq-demo.rar

    RabbitMQ原生最详细demo,深度理解RabbitMQ原理,...RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。 但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同

    RabbitMQ Cookbook

    RabbitMQ Cookbook中包含的实践配方超过70个,覆盖了从基础到高级的主题,例如消息的发布和订阅、路由、消息持久化、死信队列、工作队列、RPC通信、发布/订阅模式、消息确认机制、权限控制、集群部署和故障转移等。...

Global site tag (gtag.js) - Google Analytics