`
sillycat
  • 浏览: 2539920 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

RabbitMQ(8)Java Client - Remote Procedure Call(RPC)

 
阅读更多
RabbitMQ(8)Java Client - Remote Procedure Call(RPC)

We need to run a function on a remote computer and wait for the result. This pattern is commonly known as Remote Procedure Call or RPC.

Callback Queue
In order to receive a response we need to send a 'callback' queue address with the request.
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());

Send callback queue via properties.

Correlation Id
It is inefficient to create a callback queue for every RPC request. We will create a single callback queue per client.

CorrelationId, we are going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we will be able to match a response with a request.

Summary
Client  ------> rpc_queue --------> Server
         <-----  reply_to <----------- 

Putting it all together

Server Side:
package com.sillycat.easytalker.rabbitmq.rpc;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private final static String SERVER_HOST = "www.neptune.com";

private static int fib(int n) {
if (n == 0){
return 0;
}
if (n == 1){
return 1;
}
return fib(n - 1) + fib(n - 2);
}

public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
//send back the result
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}

Client Side
package com.sillycat.easytalker.rabbitmq.rpc;

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class RPCClient1 {

private Connection connection;
private Channel channel;

private String requestQueueName = "rpc_queue";

private String replyQueueName;
private QueueingConsumer consumer;

private final static String SERVER_HOST = "www.neptune.com";

public RPCClient1() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
connection = factory.newConnection();
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
//if the id is the same, we will get the response
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}

public void close() throws Exception {
connection.close();
}

public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient1();
System.out.println(" [x] Requesting fib(31)");
response = fibonacciRpc.call("31");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}

references:
http://www.rabbitmq.com/tutorials/tutorial-six-java.html

分享到:
评论

相关推荐

    rabbitmq-java-client-bin-3.3.4.zip

    在"rabbitmq-java-client-bin-3.3.4.zip"这个压缩包中,包含的是RabbitMQ的Java客户端库,这是与RabbitMQ服务器通信的一个关键组件。RabbitMQ提供了多种语言的客户端,Java客户端则是针对Java开发者设计的,使得Java...

    rabbitmq-java-client-bin-3.3.4

    在“rabbitmq-java-client-bin-3.3.4”这个压缩包中,包含了该版本的Java客户端库及相关文档,为Java应用提供了可靠的异步通信支持。 首先,RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源...

    rabbitmq-java (2).zip

    在这个名为"rabbitmq-java (2).zip"的压缩包中,我们可以看到几个关键文件,它们构成了一个使用Java与RabbitMQ交互的项目。 1. `rabbitmq-java.iml`:这是IntelliJ IDEA项目文件,包含了项目的模块设置和依赖关系。...

    rabbitmq-java-client-bin-2.7.0.zip

    在这个"rabbitmq-java-client-bin-2.7.0.zip"压缩包中,我们主要关注的是RabbitMQ的Java客户端库。 RabbitMQ Java客户端是Java开发者与RabbitMQ服务器进行交互的主要工具,允许程序发送和接收消息。2.7.0是这个...

    rabbitmq-java-client-3.4.1.zip

    在这个"rabbitmq-java-client-3.4.1.zip"压缩包中,包含的是RabbitMQ Java客户端库的3.4.1版本,这个版本的客户端可以让你在Java应用中轻松地与RabbitMQ服务器进行交互。 **RabbitMQ核心概念:** 1. **消息**:是...

    rabbitmq-java-client-bin-3.0.4.zip

    在这个场景中,"rabbitmq-java-client-bin-3.0.4.zip"是一个包含RabbitMQ Java客户端库的压缩包,适用于Android应用和后台服务器之间的通信。 首先,我们来了解一下RabbitMQ Java客户端。这个客户端库允许Java...

    RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包

    RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...

    rabbitmq-java-client-master.zip

    【标题】:“rabbitmq-java-client-master.zip”是一个包含RabbitMQ Java客户端库源代码的压缩文件,用于在Java应用程序中与RabbitMQ消息队列进行交互。 【描述】:“rabbitmq-server-3.7.9.exe”是RabbitMQ服务器...

    rabbitmq-client-1.3.0.jar

    本文将详细探讨RabbitMQ的客户端库——rabbitmq-client-1.3.0.jar,以及它在Java应用程序中的应用。 首先,`rabbitmq-client-1.3.0.jar`是RabbitMQ官方提供的Java客户端库,用于与RabbitMQ服务器进行通信。这个版本...

    rabbitmq-dotnet-client-3.5.0

    《RabbitMQ Dotnet客户端3.5.0详解》 在分布式系统中,消息队列作为重要的组件之一,被广泛用于解耦系统、提高可扩展性和处理异步任务。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定、高效和易用性受到了...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    RabbitMQ练习(Remote procedure call (RPC)).pcapng

    RabbitMQ练习(Remote procedure call (RPC)).pcapng

    rabbitmq-java-client-javadoc-2.7.0.zip

    《RabbitMQ Java客户端2.7.0版API文档详解》 RabbitMQ Java客户端库是用于Java开发者与RabbitMQ消息代理进行交互的核心工具。RabbitMQ是一种开源的消息代理和队列服务器,广泛应用于分布式系统中的异步处理、任务...

    rabbitmq-dotnet-client-3.6.4-dotnet-4.6.1.rar

    RabbitMQ-dotnet-client-3.6.4-dotnet-4.6.1.rar是一个包含RabbitMQ .NET客户端库的压缩包,适用于.NET Framework 4.6.1环境。这个压缩包提供了一个演示如何在WCF(Windows Communication Foundation)服务中使用...

    rabbitmq-client.jar

    rabbitmq的javaClient库,导入到项目中便可使用

    Java Rabbitmq-client

    Java 客户端库 RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。java Android RabbitMQ可以用来发送和接收消息

    rabbitmq-java-client-2.7.0.zip

    《RabbitMQ Java客户端2.7.0详解》 RabbitMQ Java客户端2.7.0是用于与RabbitMQ消息代理进行交互的Java库,它提供了丰富的API,使得Java开发者能够轻松地在应用程序中集成消息队列功能。RabbitMQ作为一款开源的消息...

    rabbitmq-server-generic-unix-3.5.7.tar.rar下载,rabbitmq安装包

    在您提供的资源中,“rabbitmq-server-generic-unix-3.5.7.tar.rar”是一个针对Linux平台的RabbitMQ服务器的离线安装包。这个版本为3.5.7,您需要在Windows环境下解压后再用于Linux系统。下面将详细介绍RabbitMQ的...

    rabbitmq-server-3.10.5-1.el8.noarch.rpm

    rabbitmq-server-3.10.5-1.el8.noarch.rpm

    RabbitMQ使用jar包-amqp-client-5.2.0.jar

    使用JAVA进行运用了RabbitMQ的程序时所需的源码包,此包导入工程之后便可使用。

Global site tag (gtag.js) - Google Analytics