本身使用RpcClient发送消息与同步接收消息的代码是很简单的,如下:
RpcClient client = new RpcClient(channel, exchange, routingKey);
String msg = "hello world!";
byte[] result = client.primitiveCall(msg.getBytes());
这里的primitiveCall调用后,当前线程会进行同步等待,等待消息接收端给自己的回复消息
一个完整的发送消息与接收回复消息的图例:
整个流程详解:
- l 创建RpcClient实例
RpcClient client = new RpcClient(channel, exchange, routingKey);
创建RpcClient时会做两件事:
A:创建一个回复queue,接收当前RpcClient发送的消息的消息接收人会将回复消息发到这个replyQueue上供当前RpcClient去接收回复消息
_replyQueue = setupReplyQueue();
protected String setupReplyQueue() throws IOException {
return _channel.queueDeclare("", false, false, true, true, null).getQueue();
//这里实际上是由rabbitmq server去定义一个唯一的queue(因为queueName是空的,所以是由server去生成queueName),最后返回这个queueName,queueName是由server生成的,使用的是以下这个方法:
Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
}
B:创建一个接收回复消息的consumer
_consumer = setupConsumer();
protected DefaultConsumer setupConsumer() throws IOException {
//创建一个接收消息的DefaultConsumer实例
DefaultConsumer consumer = new DefaultConsumer(_channel) {
@Override //发生shutdown的时候回调
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException signal) {
synchronized (_continuationMap) {
for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {
entry.getValue().set(signal);
}
_consumer = null;
}
}
@Override //处理消息交付
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//这部分就是和下面的代码一起协作来实现将异步接收强制变成同步接收
synchronized (_continuationMap) {
String replyId = properties.getCorrelationId();
BlockingCell<Object> blocker = _continuationMap.get(replyId);
_continuationMap.remove(replyId);
blocker.set(body);
}
}
};
//让接收消息的consumer去replyQueue上去接收消息,这个过程对于主线程来说是异步进行的,只要replyQueue上有消息了,consumer就会去replyQueue上去接收消息,并回调它的handleDelivery方法
_channel.basicConsume(_replyQueue, true, consumer);
return consumer;
}
- l 发送消息
byte[] result = rpcClient.primitiveCall(msg.getBytes());
使用rpcClient的primitiveCall发送消息,看看是怎么做的
public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException {
return primitiveCall(null, message);
}
继续跟踪,核心方法是这个
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws IOException, ShutdownSignalException{
//检查consumer是否为空,若为空,抛出异常
checkConsumer();
BlockingCell<Object> k = new BlockingCell<Object>();
synchronized (_continuationMap) {
_correlationId++;
String replyId = "" + _correlationId;
//如果props不为空,则将上一步骤创建的replyQueue设置到props上去,还有replyId
if (props != null) {
props.setCorrelationId(replyId);
props.setReplyTo(_replyQueue);
}
else {
//如果props为空,则创建一个,并将replyId和replyQueue都设置到props上
props = new AMQP.BasicProperties(null, null, null, null,
null, replyId,
_replyQueue, null, null, null,
null, null, null, null);
}
_continuationMap.put(replyId, k);
}
//使用上面的props发送消息,这样replyQueue和replyId就跟着传递到了接收消息的那一方去了,接收消息的client去props上去取到replyQueue,它就知道了它接收的消息的回复queue,然后它会将回复消息发送到replyQueue上去,而在上一步骤我们已经指定了一个consumer去replyQueue上去取消息,所以整个发送和接收消息的所有client是有条不紊的进行着
publish(props, message); //这行代码执行完后,只是将消息发送出去了,接收回复消息是异步的,由上一步骤的consumer去接收回复消息
//这里就是进行同步等待接收回复消息,将异步接收变成同步回复接收的核心就在这里
Object reply = k.uninterruptibleGet();
if (reply instanceof ShutdownSignalException) {
ShutdownSignalException sig = (ShutdownSignalException) reply;
ShutdownSignalException wrapper =
new ShutdownSignalException(sig.isHardError(),
sig.isInitiatedByApplication(),
sig.getReason(),
sig.getReference());
wrapper.initCause(sig);
throw wrapper;
} else {
return (byte[]) reply;
}
}
完整描述
- 创建RpcClient实例:
1,定义一个Map,用于存放每个消息的相关信息:
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
Key是一个correlationId,相当于当前rpcClient实例发送消息的一个计数器,初始化时是0,每发送一个消息时,加1
Value是一个com.rabbitmq.utility.BlockingCell对象,它是在发送消息前创建,并和当前的correlationId进行关联,放进来
_continuationMap.put(correlationId, blockingCell);
2,correlationId初始化为0
3,创建一个回复queue,replyQueue=channel.queueDeclare("", false, false, true, true, null).getQueue();
4,创建一个接收回复消息的consumer
5,指定consumer接收replyQueue上的消息,channel.basicConsume(replyQueue, true, consumer);
- RpcClient发送消息:
1,创建一个BlockingCell<Object>对象blockingCell
1,correlationId++
2,创建BasicProperties对象,并将correlationId,replyQueue设置到它上面,发送消息时,它会被传递到接收方
3,以correlationId为Key,将blockingCell放入到_continuationMap中
4,发送消息:channel.basicPublish(exchange, routingKey, 上面 步骤得到的BasicProperties对象, message);
5,获取回复消息,Object reply = blockingCell.uninterruptibleGet();这里就是同步等待回复消息
- RpcServer接收消息:
1,接收消息
2,从request中获取BasicProperties对象requestProperties,requestProperties=request.getProperties()
3,从requestProperties中得到correlationId,replyQueue
4,创建一个回复消息用的BasicProperties对象replyProperties,并将correlationId设置到它上面
4,发送回复消息:channel.basicPublish("", replyQueue, replyProperties, replyMessage);
- RpcClient接收回复:
1,replyQueue一有消息,consumer就会接收到并回调consumer的handleDelivery方法
2,获取传递过来的BasicProperties获取correlationId
3,根据correlationId去continuationMap中取BlockingCell对象,BlockingCell<Object> blocker = continuationMap.get(correlationId);
4,从continuationMap中删除,continuationMap.remove(correlationId);
5,将回复消息设置到blocker对象里面,blocker.set(replyMessage);
- 同步等待回复消息:
1,【RpcClient发送消息】第4步主线程,发送消息后,第5步就去获取回复消息
2,【RpcClient发送消息】第5步主线程,blockingCell.uninterruptibleGet(),如果blockingCell没有被set(value)过,那么让当前主线程处于等待wait(),等待状态
3, 【RpcClient接收回复】第5步blocker.set(replyMessage);这里的blocker其实就是上面主线程创建的 blockingCell,因为它是根据correlationId去continuationMap中取 的,set(replyMessage),blocker会用一个属性将replyMessage保存起来,供get的时候去返回这个属性,然后调用 notify();唤醒处于等待的主线程(当前这步所在的线程和上一步主线程是在两个线程,所以主线程的等待是可以被这个线程唤醒的),主线程被唤醒 后,get()就会取到replyMessage,最终整个步骤实现了将异步接收强制转换为同步等待接收
- BlockingCell类
public class BlockingCell<T> {
private boolean _filled = false;
private T _value;
private static final long NANOS_IN_MILLI = 1000 * 1000;
private static final long INFINITY = -1;
public BlockingCell() {
}
public synchronized T get() throws InterruptedException {
while (!_filled) { //如果value没有被设置过
wait(); //让当前线程处于等待,直到其它线程调用当前对象的notify()或notifyAll()为止
}
return _value;
}
//带超时的get
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
if (timeout < 0 && timeout != INFINITY)
throw new AssertionError("Timeout cannot be less than zero");
if (!_filled && timeout != 0) {
wait(timeout == INFINITY ? 0 : timeout);
}
if (!_filled)
throw new TimeoutException();
return _value;
}
//无限制的等待,直到取到值为止
public synchronized T uninterruptibleGet() {
while (true) {
try {
return get();
} catch (InterruptedException ex) {
}
}
}
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
long now = System.nanoTime() / NANOS_IN_MILLI;
long runTime = now + timeout;
do {
try {
return get(runTime - now);
} catch (InterruptedException e) {
}
} while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
throw new TimeoutException();
}
public synchronized void set(T newValue) {
if (_filled) {
throw new AssertionError("BlockingCell can on
}
_value = newValue;
_filled = true;
notify(); //唤醒当前线程(处于等待状态)
}
//保证只能被set(value)一次
public synchronized boolean setIfUnset(T newValue) {
if (_filled) {
return false;
}
set(newValue);
_filled = true;
return true;
}
}
相关推荐
在代码实现上,通常会使用RabbitMQ的客户端库,如Java的`rabbitmq-client`,Python的`pika`等,它们提供了方便的API来创建连接、通道、声明交换器和队列,以及发送和接收消息。 理解并掌握这些RPC实现方式有助于...
- **打开通道**:在连接上打开一个Channel,这是实际发送和接收消息的地方。 - **声明队列**:在点对点和应答模式中,需要声明队列。在发布/订阅模式中,订阅者也需要声明订阅的队列。 - **发送消息**:使用...
- **RabbitMQ**:AMQP标准的消息代理和队列服务器,适用于复杂的路由场景。 #### 五、并发与多线程 - **Synchronized和ReentrantLock的区别** - **Synchronized**:内置锁,使用关键字修饰方法或代码块。 - **...
25. **Consumer**:在微服务或消息队列中,消费者是指接收和处理消息的应用程序。 26. **Reference**:在编程中,引用通常指代对象的一个别名,可以用来访问和操作该对象。 27. **Registry**:注册表在Windows操作...
少儿编程scratch项目源代码文件案例素材-绝地求生.zip
嵌入式八股文面试题库资料知识宝典-文思创新面试题2010-04-08.zip
一种基于剪切波和特征信息检测的太阳斑点图融合算法.pdf
内容概要:本文详细介绍了并联型有源电力滤波器(APF)在Matlab/Simulink环境下的仿真研究。主要内容涵盖三个关键技术点:一是dq与αβ坐标系下的谐波和无功检测,利用dq变换和FBD技术实现实时检测;二是两相旋转坐标系(dq)与两相静止坐标系(αβ)下的PI控制,通过调整比例和积分环节实现精准控制;三是SVPWM调制方式的应用,通过优化开关时序提升系统效率和性能。文中还提供了详细的仿真介绍文档,包括模型搭建、参数设定以及结果分析。 适合人群:从事电力电子、自动化控制领域的研究人员和技术人员,尤其是对电力滤波器仿真感兴趣的读者。 使用场景及目标:适用于需要深入了解并联型APF工作原理和实现方式的研究人员,旨在通过仿真工具掌握谐波和无功检测、PI控制及SVPWM调制的具体应用。 其他说明:本文不仅提供了理论知识,还结合了实际操作步骤,使读者能够通过仿真模型加深对APF的理解。
Arduino KEY实验例程,开发板:正点原子EPS32S3,本人主页有详细实验说明可供参考。
嵌入式八股文面试题库资料知识宝典-嵌入式C语言面试题汇总(66页带答案).zip
.archivetempdebug.zip
嵌入式系统开发_CH551单片机_USB_HID复合设备模拟_基于CH551单片机的USB键盘鼠标复合设备模拟器项目_用于通过CH551微控制器模拟USB键盘和鼠标输入设备_实现硬
少儿编程scratch项目源代码文件案例素材-剑客冲刺.zip
少儿编程scratch项目源代码文件案例素材-火影.zip
内容概要:本文详细介绍了两极式单相光伏并网系统的组成及其仿真优化方法。前级采用Boost电路结合扰动观察法(P&O)进行最大功率点跟踪(MPPT),将光伏板输出电压提升至并网所需水平;后级利用全桥逆变加L型滤波以及电压外环电流内环控制,确保并网电流与电网电压同频同相,实现高效稳定的并网传输。文中还提供了具体的仿真技巧,如开关频率设置、L滤波参数计算和并网瞬间软启动等,最终实现了98.2%的系统效率和低于0.39%的总谐波失真率(THD)。 适合人群:从事光伏并网系统研究、设计和开发的技术人员,特别是对Boost电路、MPPT算法、逆变技术和双环控制系统感兴趣的工程师。 使用场景及目标:适用于希望深入了解两极式单相光伏并网系统的工作原理和技术细节的研究人员和工程师。目标是在实际项目中应用这些理论和技术,提高光伏并网系统的效率和稳定性。 其他说明:文中提供的仿真技巧和伪代码有助于读者更好地理解和实现相关算法,在实践中不断优化系统性能。同时,注意电网电压跌落时快速切换到孤岛模式的需求,确保系统的安全性和可靠性。
矢量边界,行政区域边界,精确到乡镇街道,可直接导入arcgis使用
嵌入式八股文面试题库资料知识宝典-嵌入式c面试.zip
嵌入式八股文面试题库资料知识宝典-I2C总线.zip
内容概要:本文详细介绍了三种注浆模型——随机裂隙网络注浆模型、基于两相达西定律的注浆模型、基于层流和水平集的注浆扩散模型。首先,随机裂隙网络注浆模型基于地质学原理,模拟裂隙网络发育的实际地质情况,在不同注浆压力下进行注浆作业,以增强地基稳定性和提高承载能力。其次,基于两相达西定律的注浆模型利用数学公式模拟裂隙网络中的流体输送过程,适用于裂隙网络地质条件下的注浆效果分析。最后,基于层流和水平集的注浆扩散模型通过引入层流特性和水平集方法,更准确地模拟注浆过程中的扩散过程。文中还讨论了不同注浆压力对注浆效果的影响,并提出了优化建议。 适合人群:从事岩土工程、地基加固等相关领域的工程师和技术人员。 使用场景及目标:①帮助工程师选择合适的注浆模型和注浆压力;②为实际工程项目提供理论支持和技术指导;③提升地基加固的效果和效率。 其他说明:文章强调了在实际应用中需要结合地质条件、裂隙网络特点等因素进行综合分析,以达到最佳注浆效果。同时,鼓励不断创新注浆工艺和方法,以满足日益增长的地基加固需求。
内容概要:本文详细比较了COMSOL Multiphysics软件5.5和6.0版本在模拟Ar棒板粗通道流注放电现象方面的异同。重点探讨了不同版本在处理电子密度、电子温度、电场强度以及三维视图等方面的优缺点。文中不仅介绍了各版本特有的操作方式和技术特点,还提供了具体的代码实例来展示如何进行精确的仿真设置。此外,文章还讨论了网格划分、三维数据提取和电场强度后处理等方面的技术难点及其解决方案。 适合人群:从事等离子体物理研究的专业人士,尤其是熟悉COMSOL Multiphysics软件并希望深入了解其最新特性的研究人员。 使用场景及目标:帮助用户选择合适的COMSOL版本进行高效、精确的等离子体仿真研究,特别是在处理复杂的Ar棒板粗通道流注放电现象时提供指导。 其他说明:文章强调了在实际应用中,选择COMSOL版本不仅要考虑便捷性和视觉效果,还需兼顾仿真精度和可控性。