RabbitMQ默认的consumer为异步监听,RPC应用需要实现consumer的同步,可以使用QueueingConsumer(继承与DefaultConsumer,定义了一个堵塞队列LinkedBlockingQueue)实现同步
实际上就是服务端、客户端各定义一个消息队列,相互发送消息,客户端发送消息后同步等待结果返回
发送消息时指定回复队列、corrId:
//发送消息,指定回复消息所在队列
BasicProperties props=new BasicProperties.Builder().correlationId(corrId).replyTo(queueName).build();
channel.basicPublish("", "rpc_queue", props, SerializationUtils.serialize(mes));
监听队列,等待服务端反馈(堵塞):
//监听队列,接收服务端回复消息
while(true){
QueueingConsumer.Delivery delivery=consumer.nextDelivery();//堵塞
//do something
break;
}
服务端接收消息处理后回写处理结果:
channel.basicPublish("", properties.getReplyTo() ,new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(),
SerializationUtils.serialize("response:"+mes));
服务端开启手动应答:
//关闭自动应答机制,默认开启;这时候需要手动进行应该
channel.basicConsume("rpc_queue", false, consumer);
channel.basicAck(envelope.getDeliveryTag(), false);
限制服务端最大处理量:
channel.basicQos(1);
package com.demo.mq.rabbitmq.example06;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* RPC服务端
* @author sheungxin
*
*/
public class RPCServer{
/**
* RPC服务端,声明一个服务处理队列,接收消息处理后,把结果回写给客户端
* 需要开启手动应答机制,确保服务执行完成
* @param object 消息主体
* @throws IOException
*/
public static void sendAToB(Serializable object) throws Exception{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
channel.queueDeclare("rpc_queue", false, false, false, null);
channel.basicQos(1);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
String mes=SerializationUtils.deserialize(body);
System.out.println(envelope.getRoutingKey()+":Received :'"+mes+"' done");
//接收到消息后,向发送方回写消息:指定发送方所在队列(properties.getReplyTo())、correlationId
channel.basicPublish("", properties.getReplyTo() ,
new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(),
SerializationUtils.serialize("response:"+mes));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭自动应答机制,默认开启;这时候需要手动进行应该
channel.basicConsume("rpc_queue", false, consumer);
System.out.println("*********8");
}
public static void main(String[] args) throws Exception {
sendAToB("Hello World !");
}
}
package com.demo.mq.rabbitmq.example06;
import java.util.UUID;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* RPC客户端
* @author sheungxin
*
*/
@SuppressWarnings("deprecation")
public class RPCClient {
/**
* RPC客户端:向服务端处理队列发送消息,发送时指定correlationId、回复队列名称,同时在回复队列上建立consumer,进行监听接收回复消息
* @param mes
* @throws Exception
*/
public static String call(String mes) throws Exception{
String response=null;
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
//创建一个临时队列,用于接收回复消息
String queueName=channel.queueDeclare().getQueue();
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
String corrId=UUID.randomUUID().toString();
//发送消息,指定回复消息所在队列
BasicProperties props=new BasicProperties.Builder().correlationId(corrId).replyTo(queueName).build();
channel.basicPublish("", "rpc_queue", props, SerializationUtils.serialize(mes));
//监听队列,接收服务端回复消息
while(true){
QueueingConsumer.Delivery delivery=consumer.nextDelivery();
if(delivery.getProperties().getCorrelationId().equals(corrId)){
response=SerializationUtils.deserialize(delivery.getBody());
break;
}
}
return response;
}
public static void main(String[] args) throws Exception {
System.out.println(call("hello world"));
System.out.println("waiting for rpc is over");
}
}
分享到:
相关推荐
RabbitMQ学习实践二:MQ的安装
RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送...
消息队列:RabbitMQ:RabbitMQ高级特性:Exchange类型.docx
rabbitmq-server-3.9.11.exe
**RabbitMQ 消息中间件在抢购场景中的应用** 在现代互联网应用中,抢购活动已经成为一种常见的促销手段,吸引大量用户在同一时间参与。为了处理这种高并发的业务场景,开发人员需要采取有效的方式来分发和处理大量...
【RabbitMQ集群搭建指南】 在企业IT架构中,消息队列(MQ)扮演着至关重要的角色,它能够实现异步处理、解耦系统组件、提高系统吞吐量。RabbitMQ作为开源MQ的佼佼者,凭借其高效、稳定和易扩展的特性,广泛应用于...
消息队列:RabbitMQ:RabbitMQ高级特性:死信队列技术教程.docx
消息队列:RabbitMQ:RabbitMQ高级特性:Routing键与绑定.docx
然而,有时候我们可能需要利用消息中间件来实现类似的RPC功能,比如使用RabbitMQ。本篇文章将深入探讨如何利用RabbitMQ实现类似Dubbo的RPC调用。 首先,我们要理解RabbitMQ是一个开源的消息队列系统,基于AMQP...
rabbitmq-3.10.6:management
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
本文档为 RabbitMq 使用手册,介绍了 RabbitMq 的应用场景和开发指导。RabbitMq 是一个由 Erlang 开发的 AMQP(Advanced Message Queue)流行的开源消息队列系统。RabbitMq 的结构图如下: RabbitMq 几个概念说明:...
RabbitMQ作为一款强大的开源消息队列系统,被广泛用于实现异步处理、解耦系统组件以及RPC(远程过程调用)等场景。本篇文章将深入探讨如何使用Java实现基于RabbitMQ的RPC。 首先,我们要理解RabbitMQ的基本概念。...
rabbitmq-3.7.28-management-alpine 离线镜像安装包
1. 拉取 RabbitMQ 镜像:使用 Docker 的 pull 命令拉取 RabbitMQ 镜像,例如:`docker pull rabbitmq:3.9.8-management`。 2. 启动 RabbitMQ 容器:使用 Docker 的 run 命令启动 RabbitMQ 容器,例如:`docker run -...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...
基于rabbitmq官方教程go语言rpc篇的升级版改造,加
rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:...
Spring RabbitMQ RPC(远程过程调用)是一种使用RabbitMQ实现客户端与服务器间通信的方式,它允许客户端发送请求到服务器,然后服务器处理请求并返回结果。在这个场景中,RabbitMQ作为一个消息中间件,帮助解耦应用...