`

RabbitMQ学习(六)之远程过程调用(RPC)

 
阅读更多
在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(在Java客户端除外)。

AMQP协议给消息定义了14个属性。大部分的属性很少使用,除了下面几个:
  deliveryMode: 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。
  contentType:用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。
  replyTo: 通常用来命名一个回调队列.
   correlationId: 用来关联RPC请求的响应.


RPC工作流程:



1)、客户端启动时,创建了一个匿名的回调队列。
2)、在一个RPC请求中,客户端发送一个消息,它有两个属性:1.REPLYTO,用来设置回调队列名;2.correlationId,对于每个请求都被设置成唯一的值。
3)、请求被发送到rpc_queue队列.
4)、RPC工作者(又名:服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。
5)、客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。


1、RPC服务器的RPCServer.Java,接收消息调用rpc并返回结果

package cn.slimsmart.rabbitmq.demo.rpc;  

import java.security.MessageDigest;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
//RPC调用服务端  
public class RPCServer {  
    private static final String RPC_QUEUE_NAME = "rpc_queue";  
    public static void main(String[] args) throws Exception {  
        //• 先建立连接、通道,并声明队列  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.36.217");  
        factory.setUsername("admin");  
        factory.setPassword("admin");  
        factory.setPort(AMQP.PROTOCOL.PORT);  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  
        //•可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。  
        channel.basicQos(1);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        //打开应答机制autoAck=false  
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
        System.out.println(" [x] Awaiting RPC requests");  
        while (true) {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            BasicProperties props = delivery.getProperties();  
            BasicProperties replyProps = new BasicProperties.Builder()  
                    .correlationId(props.getCorrelationId()).build();  
            String message = new String(delivery.getBody());  
            System.out.println(" [.] getMd5String(" + message + ")");  
            String response = getMd5String(message);  
            //返回处理结果队列  
            channel.basicPublish("", props.getReplyTo(), replyProps,  
                    response.getBytes());  
            //发送应答   
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
        }  
    }  
    // 模拟RPC方法 获取MD5字符串  
    public static String getMd5String(String str) {  
        MessageDigest md5 = null;  
        try {  
            md5 = MessageDigest.getInstance("MD5");  
        } catch (Exception e) {  
            System.out.println(e.toString());  
            e.printStackTrace();  
            return "";  
        }  
        char[] charArray = str.toCharArray();  
        byte[] byteArray = new byte[charArray.length];  
  
        for (int i = 0; i < charArray.length; i++)  
            byteArray[i] = (byte) charArray[i];  
        byte[] md5Bytes = md5.digest(byteArray);  
        StringBuffer hexValue = new StringBuffer();  
        for (int i = 0; i < md5Bytes.length; i++) {  
            int val = ((int) md5Bytes[i]) & 0xff;  
            if (val < 16)  
                hexValue.append("0");  
            hexValue.append(Integer.toHexString(val));  
        }  
        return hexValue.toString();  
    }  
}  


2.客户端RPCClient.java,发送rpc调用消息,接收结果
package cn.slimsmart.rabbitmq.demo.rpc;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
import com.rabbitmq.client.AMQP.BasicProperties;  
  
//RPC调用客户端  
public class RPCClient {  
    private Connection connection;  
    private Channel channel;  
    private String requestQueueName = "rpc_queue";  
    private String replyQueueName;  
    private QueueingConsumer consumer;  
  
    public RPCClient() throws Exception {  
        //• 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.36.217");  
        factory.setUsername("admin");  
        factory.setPassword("admin");  
        factory.setPort(AMQP.PROTOCOL.PORT);  
        connection = factory.newConnection();  
        channel = connection.createChannel();  
        //• 注册'回调'队列,这样就可以收到RPC响应  
        replyQueueName = channel.queueDeclare().getQueue();  
        consumer = new QueueingConsumer(channel);  
        channel.basicConsume(replyQueueName, true, consumer);  
    }  
  
    //发送RPC请求  
    public String call(String message) throws Exception {  
        String response = null;  
        String corrId = java.util.UUID.randomUUID().toString();  
        //发送请求消息,消息使用了两个属性:replyto和correlationId  
        BasicProperties props = new BasicProperties.Builder()  
                .correlationId(corrId).replyTo(replyQueueName).build();  
        channel.basicPublish("", requestQueueName, props, message.getBytes());  
        //等待接收结果  
        while (true) {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            //检查它的correlationId是否是我们所要找的那个  
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
                response = new String(delivery.getBody());  
                break;  
            }  
        }  
        return response;  
    }  
    public void close() throws Exception {  
        connection.close();  
    }  
}  


3、运行client主函数RPCMain.java
package cn.slimsmart.rabbitmq.demo.rpc;  
 
public class RPCMain {  
  
    public static void main(String[] args) throws Exception {  
        RPCClient rpcClient = new RPCClient();  
        System.out.println(" [x] Requesting getMd5String(abc)");     
       String response = rpcClient.call("abc");  
        System.out.println(" [.] Got '" + response + "'");  
        rpcClient.close();  
    }  
}  


先运行服务端,再运行RPCMain,发送消息调用RPC。
这里介绍的是该设计不是实现RPC服务的唯一可能,但它有一些重要的优点:
1)如果RPC服务器速度太慢,你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行第二RPCServer。
2)RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此,RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。

转自:http://blog.csdn.net/qq397709884/article/details/51918314

http://blog.csdn.net/xiaoxian8023/article/details/48826857
分享到:
评论

相关推荐

    c# 实现远程调用(rpc) remoting

    "c# 实现远程调用(rpc) remoting"是C#的一个关键特性,它允许对象在不同的进程甚至不同的计算机之间进行通信,仿佛它们是在同一个内存空间内操作。这个主题涉及到分布式系统开发,对于理解跨进程通信和提升系统的可...

    RabbitMQ 实现类似Dubbo的RPC调用.rar

    在Java领域,Dubbo是一款广泛使用的RPC框架,它提供了高性能、透明化的远程服务调用能力。然而,有时候我们可能需要利用消息中间件来实现类似的RPC功能,比如使用RabbitMQ。本篇文章将深入探讨如何利用RabbitMQ实现...

    spring rabbitmq rpc 测试代码

    总结一下,Spring RabbitMQ RPC的核心在于利用RabbitMQ作为中间人,通过定义交换机、队列和绑定,实现在客户端和服务端之间进行异步的远程调用。这种方式可以很好地扩展系统,同时保持组件间的解耦。通过配置和编程...

    rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用

    在本主题"rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用"中,我们将深入探讨如何使用RabbitMQ作为消息中间件,结合Spring-AMQP库实现RPC模式。 RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP...

    RabbitMQ (四)实现类似Dubbo的RPC调用

    在分布式系统中,远程过程调用(RPC)是一种常见的通信方式,它允许一个服务直接调用另一个服务的方法,仿佛两个服务是在同一个进程中运行一样。在微服务架构中,Dubbo 是一个广泛使用的 RPC 框架。本篇我们将探讨...

    rabbitmq RPC java 实现

    RabbitMQ作为一款强大的开源消息队列系统,被广泛用于实现异步处理、解耦系统组件以及RPC(远程过程调用)等场景。本篇文章将深入探讨如何使用Java实现基于RabbitMQ的RPC。 首先,我们要理解RabbitMQ的基本概念。...

    基于rabbitmq的rpc调用的3中方式实战-rabbitmq-rpc.zip

    在本实战教程中,我们将探讨基于 RabbitMQ 的远程过程调用(RPC,Remote Procedure Call)的三种实现方式。 1. **基于工作者模式的RPC** 在这种模式下,客户端发送请求到RabbitMQ的特定交换器,然后这个请求被路由...

    spring集成rabbitmq实现rpc

    本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...

    rpc远程调用实例,包含源代码以及实验报告

    通过深入学习这个RPC远程调用实例,我们可以掌握RPC的基本原理和实际应用,这对于理解分布式系统的工作方式、提升软件开发技能具有重要的意义。同时,通过源代码的阅读和实验报告的学习,我们可以更直观地了解RPC...

    基于springboot的两个项目之间的远程调用

    - **RabbitMQ**:Spring Boot可以通过集成RabbitMQ实现消息队列的远程调用,适用于异步处理和解耦。 - **RESTful API**:基于HTTP协议,Spring Boot提供Spring MVC和WebFlux来创建RESTful服务,简单易用且跨平台。...

    RabbitMQ RPC

    RabbitMQ RPC(远程过程调用)是一种利用消息队列系统实现客户端与服务端之间异步通信的技术。RabbitMQ,作为一款开源的消息代理和队列服务器,广泛应用于分布式系统中,用于解耦组件间的直接依赖,提高系统的可靠性...

    RabbitmqDemo:RabbitMQ相关示例

    RabbitMQ示例6:远程过程调用RPC Pom.xml &lt;? xml version = " 1.0 " encoding = " UTF-8 " ?&gt; &lt; project xmlns = " http://maven.apache.org/POM/4.0.0 " xmlns : xsi = " ...

    RabbitMq学习笔记1

    5. **RPC(远程过程调用)**: - 通过消息传递实现客户端和服务器之间的请求-响应交互。 理解这些工作模式和交换器类型是有效利用RabbitMQ的关键,可以根据实际需求选择合适的模式来设计消息传递流程。 **总结** ...

    Python RabbitMQ消息队列实现rpc

    在本文中,我们将深入探讨如何使用Python和RabbitMQ实现远程过程调用(RPC)。RabbitMQ是一种流行的消息中间件,它允许分布式系统中的组件通过消息传递进行通信。RPC模式在Python中通过RabbitMQ实现,可以使得客户端...

    rabbitmq-rpc-consumer-go:用于RabbitMQ RPC的Go模块(消费者)

    在分布式系统中,RabbitMQ 被广泛用来处理异步任务、解耦组件以及实现远程过程调用(RPC)。本文将深入探讨 `rabbitmq-rpc-consumer-go`,这是一个专门用于RabbitMQ RPC的Go语言模块,主要关注其作为消费者的实现。 ...

    rabbitMq入门

    这种模式类似于远程过程调用(RPC)。 - `RPCClient.java`会创建连接,发送请求消息到服务端指定的队列,并设置一个回调函数来接收服务端的响应。一旦接收到响应,客户端可以继续执行后续操作。 在实际应用中,`...

    rabbitmq代码.zip

    - **RPC(远程过程调用)**:使用RabbitMQ实现简单的RPC模式,客户端发送请求消息,服务端接收到后处理并返回响应。 - **Topology设计**:合理的交换机、队列和绑定设计对于系统性能和可维护性至关重要。 压缩包中...

    Laravel开发-rabbitmq-client

    在本文中,我们将深入探讨如何在 Laravel 开发环境中使用 `rabbitmq-client`,这是一个基于 AMQP 协议的 RabbitMQ 客户端,为 Laravel 和 Lumen 提供了异步处理和 Direct RPC(远程过程调用)接口。RabbitMQ 是一个...

    rabbitmq.net 各种实例

    八、RPC(远程过程调用) RabbitMQ还可以实现RPC模式,通过发送请求消息,等待响应消息。这需要创建一个专门的应答队列,并使用`BasicGet`或`BasicConsume`来获取响应。 九、事务与确认模式 为了保证消息的可靠性...

    rabbitMQ 消息队列 Demo

    6. **06RPC(远程过程调用)** RPC示例展示了如何使用RabbitMQ实现异步RPC。生产者发送一个请求到队列,然后等待响应。RabbitMQ中的消费者接收到请求后处理,并将结果发送回一个特定的响应队列,生产者可以从这个...

Global site tag (gtag.js) - Google Analytics