- 浏览: 2539958 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
RabbitMQ(8)Java Client - Remote Procedure Call(RPC)
We need to run a function on a remote computer and wait for the result. This pattern is commonly known as Remote Procedure Call or RPC.
Callback Queue
In order to receive a response we need to send a 'callback' queue address with the request.
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
Send callback queue via properties.
Correlation Id
It is inefficient to create a callback queue for every RPC request. We will create a single callback queue per client.
CorrelationId, we are going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we will be able to match a response with a request.
Summary
Client ------> rpc_queue --------> Server
<----- reply_to <-----------
Putting it all together
Server Side:
package com.sillycat.easytalker.rabbitmq.rpc;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private final static String SERVER_HOST = "www.neptune.com";
private static int fib(int n) {
if (n == 0){
return 0;
}
if (n == 1){
return 1;
}
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
//send back the result
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
Client Side
package com.sillycat.easytalker.rabbitmq.rpc;
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCClient1 {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
private final static String SERVER_HOST = "www.neptune.com";
public RPCClient1() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
//if the id is the same, we will get the response
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient1();
System.out.println(" [x] Requesting fib(31)");
response = fibonacciRpc.call("31");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
references:
http://www.rabbitmq.com/tutorials/tutorial-six-java.html
We need to run a function on a remote computer and wait for the result. This pattern is commonly known as Remote Procedure Call or RPC.
Callback Queue
In order to receive a response we need to send a 'callback' queue address with the request.
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
Send callback queue via properties.
Correlation Id
It is inefficient to create a callback queue for every RPC request. We will create a single callback queue per client.
CorrelationId, we are going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we will be able to match a response with a request.
Summary
Client ------> rpc_queue --------> Server
<----- reply_to <-----------
Putting it all together
Server Side:
package com.sillycat.easytalker.rabbitmq.rpc;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private final static String SERVER_HOST = "www.neptune.com";
private static int fib(int n) {
if (n == 0){
return 0;
}
if (n == 1){
return 1;
}
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
//send back the result
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
Client Side
package com.sillycat.easytalker.rabbitmq.rpc;
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCClient1 {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
private final static String SERVER_HOST = "www.neptune.com";
public RPCClient1() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
//if the id is the same, we will get the response
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient1 fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient1();
System.out.println(" [x] Requesting fib(31)");
response = fibonacciRpc.call("31");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
references:
http://www.rabbitmq.com/tutorials/tutorial-six-java.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 288Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 437Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 279Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 243Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 313AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 301Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 333Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 442Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 493Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 363Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 321Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 365Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 432Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 507MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 456RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 315Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 316Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 320ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 389Jetty Server and Cookie Domain ...
相关推荐
在"rabbitmq-java-client-bin-3.3.4.zip"这个压缩包中,包含的是RabbitMQ的Java客户端库,这是与RabbitMQ服务器通信的一个关键组件。RabbitMQ提供了多种语言的客户端,Java客户端则是针对Java开发者设计的,使得Java...
在“rabbitmq-java-client-bin-3.3.4”这个压缩包中,包含了该版本的Java客户端库及相关文档,为Java应用提供了可靠的异步通信支持。 首先,RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源...
在这个名为"rabbitmq-java (2).zip"的压缩包中,我们可以看到几个关键文件,它们构成了一个使用Java与RabbitMQ交互的项目。 1. `rabbitmq-java.iml`:这是IntelliJ IDEA项目文件,包含了项目的模块设置和依赖关系。...
在这个"rabbitmq-java-client-bin-2.7.0.zip"压缩包中,我们主要关注的是RabbitMQ的Java客户端库。 RabbitMQ Java客户端是Java开发者与RabbitMQ服务器进行交互的主要工具,允许程序发送和接收消息。2.7.0是这个...
在这个"rabbitmq-java-client-3.4.1.zip"压缩包中,包含的是RabbitMQ Java客户端库的3.4.1版本,这个版本的客户端可以让你在Java应用中轻松地与RabbitMQ服务器进行交互。 **RabbitMQ核心概念:** 1. **消息**:是...
在这个场景中,"rabbitmq-java-client-bin-3.0.4.zip"是一个包含RabbitMQ Java客户端库的压缩包,适用于Android应用和后台服务器之间的通信。 首先,我们来了解一下RabbitMQ Java客户端。这个客户端库允许Java...
RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...
【标题】:“rabbitmq-java-client-master.zip”是一个包含RabbitMQ Java客户端库源代码的压缩文件,用于在Java应用程序中与RabbitMQ消息队列进行交互。 【描述】:“rabbitmq-server-3.7.9.exe”是RabbitMQ服务器...
本文将详细探讨RabbitMQ的客户端库——rabbitmq-client-1.3.0.jar,以及它在Java应用程序中的应用。 首先,`rabbitmq-client-1.3.0.jar`是RabbitMQ官方提供的Java客户端库,用于与RabbitMQ服务器进行通信。这个版本...
《RabbitMQ Dotnet客户端3.5.0详解》 在分布式系统中,消息队列作为重要的组件之一,被广泛用于解耦系统、提高可扩展性和处理异步任务。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定、高效和易用性受到了...
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
RabbitMQ练习(Remote procedure call (RPC)).pcapng
《RabbitMQ Java客户端2.7.0版API文档详解》 RabbitMQ Java客户端库是用于Java开发者与RabbitMQ消息代理进行交互的核心工具。RabbitMQ是一种开源的消息代理和队列服务器,广泛应用于分布式系统中的异步处理、任务...
RabbitMQ-dotnet-client-3.6.4-dotnet-4.6.1.rar是一个包含RabbitMQ .NET客户端库的压缩包,适用于.NET Framework 4.6.1环境。这个压缩包提供了一个演示如何在WCF(Windows Communication Foundation)服务中使用...
rabbitmq的javaClient库,导入到项目中便可使用
Java 客户端库 RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。java Android RabbitMQ可以用来发送和接收消息
《RabbitMQ Java客户端2.7.0详解》 RabbitMQ Java客户端2.7.0是用于与RabbitMQ消息代理进行交互的Java库,它提供了丰富的API,使得Java开发者能够轻松地在应用程序中集成消息队列功能。RabbitMQ作为一款开源的消息...
在您提供的资源中,“rabbitmq-server-generic-unix-3.5.7.tar.rar”是一个针对Linux平台的RabbitMQ服务器的离线安装包。这个版本为3.5.7,您需要在Windows环境下解压后再用于Linux系统。下面将详细介绍RabbitMQ的...
rabbitmq-server-3.10.5-1.el8.noarch.rpm
使用JAVA进行运用了RabbitMQ的程序时所需的源码包,此包导入工程之后便可使用。