`

Java学习——rabbitmq(rpc)

 
阅读更多

 

RPC

 

/* Routing Model
         /->(Request repQueue=123 cid=abc) [...](request Queue)   \
RPCClient                                                            RPCServer
         <-\(Reply correlation_id=abc)     [...](reply Queue:123) /
*/

远程过程调用的应用场景也很广,我们来看看如何用rabbitmq实现rpc编程

 

RemoteService rs = new RemoteService();
String result = rs.speak("2012"); //block until the answer is received
System.out.println("Goodbye " + result);

关于RPC的一点看法,尽管它是一种常用的编程模型,却饱受批评,开发人员没法知道自己调用的程序是本地程序还是远程程序,可能会造成很多困惑,增加调试的复杂程度。(可以考虑使用异步流水线替代阻塞调用)

callback queue

客户端将消息和响应队列信息一并发给服务器端

 

RPCClient.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class RPCClient {

	private Connection connection;
	private Channel channel;
	
	public RPCClient() throws IOException{
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		connection = factory.newConnection();
		channel = connection.createChannel();
	}
	
	public void close() throws IOException{
		channel.close();
		connection.close();	
	}
	
	public String call(String message) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		
		String response;
		String replyQueueName = channel.queueDeclare().getQueue();
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
		
		String corrId = java.util.UUID.randomUUID().toString();
		
		BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
		
		channel.basicPublish("", "request_queue", props, message.getBytes());
		
		while(true){
			QueueingConsumer.Delivery  delivery = consumer.nextDelivery();
			if(delivery.getProperties().getCorrelationId().equals(corrId)){
				response = new String(delivery.getBody());
				break;
			}
		}
		return response;
	}
	
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
			RPCClient client = new RPCClient();
			String res = new String(client.call("test"));
			System.out.println(res);
			client.close();
	}
}

RPCServer.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 创建一个连接接收数据
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.queueDeclare("request_queue", false, false, false, null);
		channel.basicQos(1);
		
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume("request_queue", false, consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			
			BasicProperties props = delivery.getProperties();
			BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).replyTo(props.getCorrelationId()).build();
			String response = new String("Hello" + new String(delivery.getBody()));
			channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes());
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
}

分别运行RPCClient和RPCServer,可以得到结果hellotest

分享到:
评论

相关推荐

    rabbitmq RPC java 实现

    本篇文章将深入探讨如何使用Java实现基于RabbitMQ的RPC。 首先,我们要理解RabbitMQ的基本概念。RabbitMQ是一个开源的消息代理,它遵循AMQP(Advanced Message Queuing Protocol)协议,负责接收和转发消息。在RPC...

    spring rabbitmq rpc 测试代码

    Spring RabbitMQ RPC(远程过程调用)是一种使用RabbitMQ实现客户端与服务器间通信的方式,它允许客户端发送请求到服务器,然后服务器处理请求并返回结果。在这个场景中,RabbitMQ作为一个消息中间件,帮助解耦应用...

    MQ消息队列之——RabbitMQ

    学习RabbitMQ的学习笔记

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    使用Java编写的RabbitMQ连接池方法

    RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...

    RabbitMQ RPC

    RabbitMQ RPC(远程过程调用)是一种利用消息队列系统实现客户端与服务端之间异步通信的技术。RabbitMQ,作为一款开源的消息代理和队列服务器,广泛应用于分布式系统中,用于解耦组件间的直接依赖,提高系统的可靠性...

    spring集成rabbitmq实现rpc

    本文将详细讲解如何在Spring项目中集成RabbitMQ,实现基于RPC(远程过程调用)的通信模式。 首先,我们需要在项目中引入Spring AMQP库,这是Spring对RabbitMQ的官方支持。可以通过在pom.xml文件中添加以下Maven依赖...

    第11周-第10章节-Python3.5-RabbitMQ rpc实现.mp4

    第11周-第10章节-Python3.5-RabbitMQ rpc实现.mp4

    rabbitmq-rpc-consumer-go:用于RabbitMQ RPC的Go模块(消费者)

    本文将深入探讨 `rabbitmq-rpc-consumer-go`,这是一个专门用于RabbitMQ RPC的Go语言模块,主要关注其作为消费者的实现。 在Go语言中,`rabbitmq-rpc-consumer-go` 模块为开发者提供了方便的方式来创建RabbitMQ的...

    基于Java语言的RabbitMQ学习与实践设计源码

    本项目是一款基于Java语言的RabbitMQ学习与实践设计源码,共计78个文件,涵盖29个Java源文件、14个Markdown文档、8个XML配置文件、5个属性文件、4个Git忽略规则文件、4个命令行脚本文件、3个JAR包文件、2个PNG图片...

    java rabbitmq动态注册,监听实现

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...

    Java使用RabbitMq的一个简单demo

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许...在这个简单的demo中,我们学习了如何创建连接、声明交换机和队列、发送和接收消息,这些都是RabbitMQ的基础操作,为更高级的用法打下了基础。

    RabbitMQ工具类封装实现

    首先,`MQSubscribeService.java`代表的是订阅者服务,它是接收和处理来自RabbitMQ的消息的组件。在封装订阅者线程时,通常会包括以下关键知识点: 1. **创建连接和通道**:使用`ConnectionFactory`创建与RabbitMQ...

    RabbitMQ Java测试客户端

    它涵盖了连接管理、消息发送与接收的基本流程,对于学习和理解RabbitMQ在Java环境下的应用非常有帮助。在实际开发中,你可以根据项目需求调整这些示例,例如增加错误处理、消息确认机制、使用工作队列模型等,以实现...

    rabbitMQ学习笔记

    rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...

    java使用rabbitMq服务

    【Java使用RabbitMQ服务】 RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中的消息传递。本文将简要介绍如何在Java环境中使用RabbitMQ,包括安装、基本结构、消息发送模式以及高级特性。 ### 1. 安装 在...

    基于rabbitmq的rpc调用的3中方式实战-rabbitmq-rpc.zip

    在代码实现上,通常会使用RabbitMQ的客户端库,如Java的`rabbitmq-client`,Python的`pika`等,它们提供了方便的API来创建连接、通道、声明交换器和队列,以及发送和接收消息。 理解并掌握这些RPC实现方式有助于...

    java版本RabbitMQ实例.rar.rar

    Java版本的RabbitMQ实例是...这个Java版本的RabbitMQ实例是学习如何在Java项目中集成消息队列的宝贵资源。通过它,开发者可以掌握RabbitMQ的核心概念,以及如何在实际场景中利用这些概念构建健壮的、可扩展的应用程序。

    Rabbitmq工具类,java工具类RabbitmqUtil

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于高效地处理异步任务和解耦系统组件。`RabbitmqUtil` 是一个专门为Java开发者设计的工具类,...

Global site tag (gtag.js) - Google Analytics