`
yugouai
  • 浏览: 499260 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

rabbitmq入门-RPC

 
阅读更多

在项目中引入RabbitMQ通常会考虑它会带来的好处:解耦应用程序,实现不同编程语言之间的互通,解除对特定通信协议的依赖,解除应用程序在时序上执行的依赖(异步).落实到代码层面就是两种常用应用模式:"发后即忘"(fire-and-forget)和RPC.

 

fire-and-forget

RabbitMQ解决的是应用程序之间互联(connect)和规模(scale)的问题,消息发送和接收是隔离,发送方不知道消息最终由谁接收,接收方也不必关心消息是谁步发出的;发送和接收是隔离的,消息本质上就是异步的.这种隔离也就解耦了应用程序之间的依赖.RabbitMQ的角色就是应用程序中间的路由器.对于消息的发布方来讲这是一种"发后即忘"(fire-and_forget)的发布方式.

 

 fire-and-forget模式发送消息,消息的发送方和接收方彼此隔离

 

RPC

     RPC需要双向通信,或者说RPC Server需要明确知道要把消息发送给谁.我们可以在payload的数据部分附加 "发给谁" 这种EndPoint信息. RabbitMQ提供的解决方案:在每一个AMQP的消息头上有一个reply_to字段.这样消息的producer就可以指定Queue name,RPC Server接受到消息检查reply_to字段,创建一个消息包含Response并把queue name作为routing key,订阅了这个队列的Client就拿到了消息.

    这里有两件事情要保证:

    1.要为队列创建随机Name

    2.即使Name随机还是有可能冲突,还需要保证消息通信的独占性,看看RabbitMQ是怎么满足这两点的:

(1)如果创建的队列不指定queue name,rabbitMQ就会创建一个随机的Name

(2)独占只需要exclusive参数即可

    总而言之,需要做的就是Client创建一个temporary,exclusive,anonymou的queue,并把queue name设置在RPC消息的reply_to字段即可.注意这里RPC Server已经知道要投递到哪个Queue,所以不需要指Exchange



 RabbitMq RPC与传统RPC区别

  传统的RPC调用Client和Server紧密依赖,客户端连接上服务器,发送一个请求然后阻塞等待服务器响应.这样的做的特点是客户端和服务器端是知道对方的.如果RPC Server崩溃掉,客户端需要重连,如果Server彻底崩掉就要重新找一个提供同样服务的Server,然后客户端重连过去.

   用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态.

  总结一下,使用RabbitMQ是先RPC,客观上还实现了下面的效果:

  1. 容错 一个Server崩溃不影响 Client
  2. 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
  3. 在多个RPC Server之间的负载均衡由RabbitMQ完成

生产者代码

public class RPCClient {
    
  private Connection connection;
  private Channel channel;
  private String requestQueueName = "rpc_queue";
  private String replyQueueName;
  private QueueingConsumer consumer;
    
  public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
  }
  
  public String call(String message) throws Exception {     
    String response = null;
    String corrId = UUID.randomUUID().toString();
    
    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();
    
    channel.basicPublish("", requestQueueName, props, message.getBytes());
    
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      if (delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody(),"UTF-8");
        break;
      }
    }

    return response; 
  }
    
  public void close() throws Exception {
    connection.close();
  }
  
  public static void main(String[] argv) {
    RPCClient fibonacciRpc = null;
    String response = null;
    try {
      fibonacciRpc = new RPCClient();
  
      System.out.println(" [x] Requesting fib(30)");   
      response = fibonacciRpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (fibonacciRpc!= null) {
        try {
          fibonacciRpc.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}

 

消费者代码

public class RPCServer {
  
  private static final String RPC_QUEUE_NAME = "rpc_queue";
  
  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }
    
  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
  
      connection = factory.newConnection();
      channel = connection.createChannel();
      
      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  
      channel.basicQos(1);
  
      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
  
      System.out.println(" [x] Awaiting RPC requests");
  
      while (true) {
        String response = null;
        
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        
        BasicProperties props = delivery.getProperties();
        BasicProperties replyProps = new BasicProperties
                                         .Builder()
                                         .correlationId(props.getCorrelationId())
                                         .build();
        
        try {
          String message = new String(delivery.getBody(),"UTF-8");
          int n = Integer.parseInt(message);
  
          System.out.println(" [.] fib(" + message + ")");
          response = "" + fib(n);
        }
        catch (Exception e){
          System.out.println(" [.] " + e.toString());
          response = "";
        }
        finally {  
          channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
  
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
      }
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }                            
  }
}

 

  • 大小: 14.4 KB
分享到:
评论

相关推荐

    rabbitMq入门

    【标题】:“rabbitMQ入门” 在IT行业中,消息队列是一种常见的中间件技术,用于解耦应用程序组件,提高系统的可扩展性和可靠性。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中。本篇文章将带...

    RabbitMQ入门操作手册.pdf

    【RabbitMQ入门操作手册】提供了全面的RabbitMQ学习指南,从基础概念到实际操作,帮助初学者快速掌握这个强大的消息队列系统。RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理,其核心...

    RabbitMQ中文文档-.rar

    最后,"远程过程调用"(RPC)章节讲述的是如何使用RabbitMQ实现基于消息的RPC。在这种模式下,客户端发送请求消息,然后等待服务器返回响应。RabbitMQ提供了一种可靠的机制来确保请求和响应的对应关系,这对于分布式...

    poc-rabbitmq:POC RabbitMQ

    使用安装依赖项: composer install消费消息通过消费者下载消息: php app/console rabbitmq:consumer bar启动 RPC 服务器要使用 RPC,您需要启动服务器: php app/console rabbitmq:rpc-server bar运行和测试要运行...

    rabbitMQ 消息队列 Demo

    这是RabbitMQ入门的经典示例,它展示了最基础的消息发布与消费过程。生产者发送一个简单的"Hello, World!"消息到RabbitMQ服务器,然后消费者从队列中取出并打印这个消息。这个例子帮助我们理解RabbitMQ的基本工作...

    RabbitMQ 讲义.pdf

    对于初学者来说,首先需要了解什么是消息队列,然后是安装和配置RabbitMQ,接着能够编写入门级别的程序,最后能够熟练掌握RabbitMQ的5种模式特征,并能够将SpringBoot与RabbitMQ进行整合。 消息队列的实现方式主要...

    dubbo+rabbitmq+springMvc+maven简单demo

    Dubbo是阿里巴巴开源的一个高性能、轻量级的Java RPC框架,它提供了丰富的服务治理功能,如服务注册与发现、调用链跟踪、负载均衡、容错机制等。在本项目中,Dubbo可能被用于实现微服务间的远程调用,使得系统模块...

    rabbit入门

    【RabbitMQ基础入门】 RabbitMQ是一款广泛应用的消息中间件,基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中实现可靠的消息传递。它允许应用程序之间解耦,提高系统的灵活性和可扩展性。...

    Java架构技术揭秘:Redis+Nginx+Dubbo+面试题+视频.docx

    - **适用场景**:适用于大型分布式系统的RPC服务治理。 #### 2. Dubbo 面试常见问题 - 包括但不限于服务注册与发现机制、服务路由策略、负载均衡算法等内容。 ### 六、Zookeeper 技术要点 #### 1. Zookeeper ...

    node-powershell-interop:演示 nodeJS 和 Powershell 之间的互操作性

    一旦安装了两个项目,首先按照尼克的指示启动rabbitmq-proto。 您需要在 config 目录中创建一个 default.js 配置文件(出于安全原因,这是 gitignore 的),其中包含以下行并填充了值: exports.rpc_user = ''; /...

    GO语言进阶.docx

    3-6 Docker入门基础文档.mp4 3-6 本章小结.mp4 3-7 Ubuntu中通过Docker安装配置MySQL主从节点.mp4 第04章 “云存储”系统之基于用户系统实现的资源隔离及鉴权 4-1 帐号系统介绍与用户表设计.mp4 4-2 编码实战:...

    watermill:在Go中轻松构建事件驱动的应用程序

    它旨在用于构建事件驱动的应用程序,启用事件源,消息之上的RPC,sagas以及您想到的其他任何东西。 您可以使用传统的pub / sub实现,例如Kafka或RabbitMQ,也可以使用HTTP或MySQL binlog(如果适合您的用例)。目标...

    micro-v3.0.2-darwin-amd64.tar.gz

    README.md文件是项目的介绍文档,通常包含项目概述、安装指南、使用示例等,是了解项目快速入门的关键。最后,“micro”文件可能是可执行文件,用于在命令行中运行和管理go-micro的服务。 go-micro框架的核心特性...

    go-plugins:转到微型插件。 移至go-microplugins

    NATS,NSQ,RabbitMQ,Kafka 客户RPC客户端; gRPC,HTTP 编解码器消息编码; 水星BSON 微微型工具包插件登记处服务发现; Etcd,八卦,NATS 选择器负载均衡; 标签,缓存,静态服务器RPC服务器; gRPC,HTTP 运输...

    世上最全的java面试复习知识汇总,从基础到高阶,从八股文到实践,从入门到入土

    Netty 与 RPC 网络 日志 Zookeeper Kafka RabbitMQ 数据库 一致性算法 JAVA 算法 Spark 集合 多线程并发 设计模式 负载均衡 数据结构 加密算法 分布式缓存 机器学习 云计算 JVM Hbase MongoDB Cassandra ...

    python入门到高级全栈工程师培训 第3期 附课件代码

    python入门到高级全栈工程师培训视频学习资料;本资料仅用于学习,请查看后24小时之内删除。 【课程内容】 第1章 01 计算机发展史 02 计算机系统 03 小结 04 数据的概念 05 进制转换 06 原码补码反码 07 物理层和...

    dubbo多种小案例

    【描述】中的内容表明,这个压缩包提供了丰富的实例代码,覆盖了多个功能模块,包括基础的"Hello World"入门项目,文件上传与下载功能,MongoDB数据库的集成,邮件服务,RabbitMQ消息中间件的使用,以及Shiro安全...

    从0到1实战微服务架构.pdf

    在微服务的自动发现与负载均衡方面,文档讲述了Kubernetes的快速入门和在微服务中的应用,强调了服务发现机制对于服务之间高效交互的重要性。通过服务发现,微服务可以在运行时动态地定位其他服务,而负载均衡则确保...

Global site tag (gtag.js) - Google Analytics