`

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

阅读更多

本身使用RpcClient发送消息与同步接收消息的代码是很简单的,如下:

RpcClient client = new RpcClient(channel, exchange, routingKey);

String msg = "hello world!";

byte[] result = client.primitiveCall(msg.getBytes());

这里的primitiveCall调用后,当前线程会进行同步等待,等待消息接收端给自己的回复消息

一个完整的发送消息与接收回复消息的图例:

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理 - micro sun - 学无止境

整个流程详解:

 

  • l  创建RpcClient实例

 

RpcClient client = new RpcClient(channel, exchange, routingKey);

创建RpcClient时会做两件事:

A:创建一个回复queue,接收当前RpcClient发送的消息的消息接收人会将回复消息发到这个replyQueue上供当前RpcClient去接收回复消息

_replyQueue = setupReplyQueue();

   

protected String setupReplyQueue() throws IOException {

return _channel.queueDeclare("", false, false, true, true, null).getQueue();

//这里实际上是由rabbitmq server去定义一个唯一的queue(因为queueName是空的,所以是由server去生成queueName),最后返回这个queueNamequeueName是由server生成的,使用的是以下这个方法:

Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,

                                 Map<String, Object> arguments)

}

 

B:创建一个接收回复消息的consumer

_consumer = setupConsumer();

 

protected DefaultConsumer setupConsumer() throws IOException {

//创建一个接收消息的DefaultConsumer实例

DefaultConsumer consumer = new DefaultConsumer(_channel) {

    @Override //发生shutdown的时候回调

    public void handleShutdownSignal(String consumerTag,

                ShutdownSignalException signal) {

synchronized (_continuationMap) {

    for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {

    entry.getValue().set(signal);

    }

    _consumer = null;

}

    }

 

    @Override //处理消息交付

    public void handleDelivery(String consumerTag,

               Envelope envelope,

               AMQP.BasicProperties properties,

               byte[] body)

    throws IOException {

//这部分就是和下面的代码一起协作来实现将异步接收强制变成同步接收

synchronized (_continuationMap) {

    String replyId = properties.getCorrelationId();

    BlockingCell<Object> blocker = _continuationMap.get(replyId);

    _continuationMap.remove(replyId);

    blocker.set(body);

}

    }

};

//让接收消息的consumerreplyQueue上去接收消息,这个过程对于主线程来说是异步进行的,只要replyQueue上有消息了,consumer就会去replyQueue上去接收消息,并回调它的handleDelivery方法

_channel.basicConsume(_replyQueue, true, consumer);

return consumer;

}

 

 

  • l  发送消息

 

byte[] result = rpcClient.primitiveCall(msg.getBytes());

使用rpcClientprimitiveCall发送消息,看看是怎么做的

public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException {

return primitiveCall(null, message);

}

继续跟踪,核心方法是这个

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws IOException, ShutdownSignalException{

//检查consumer是否为空,若为空,抛出异常

checkConsumer();

 

BlockingCell<Object> k = new BlockingCell<Object>();

synchronized (_continuationMap) {

    _correlationId++;

    String replyId = "" + _correlationId;

//如果props不为空,则将上一步骤创建的replyQueue设置到props上去,还有replyId

    if (props != null) {

        props.setCorrelationId(replyId);

        props.setReplyTo(_replyQueue);

    }

    else {

//如果props为空,则创建一个,并将replyIdreplyQueue都设置到props

        props = new AMQP.BasicProperties(null, null, null, null,

                    null, replyId,

                    _replyQueue, null, null, null,

                    null, null, null, null);

    }

    _continuationMap.put(replyId, k);

}

//使用上面的props发送消息,这样replyQueuereplyId就跟着传递到了接收消息的那一方去了,接收消息的clientprops上去取到replyQueue,它就知道了它接收的消息的回复queue,然后它会将回复消息发送到replyQueue上去,而在上一步骤我们已经指定了一个consumerreplyQueue上去取消息,所以整个发送和接收消息的所有client是有条不紊的进行着

publish(props, message);  //这行代码执行完后,只是将消息发送出去了,接收回复消息是异步的,由上一步骤的consumer去接收回复消息

//这里就是进行同步等待接收回复消息,将异步接收变成同步回复接收的核心就在这里

Object reply = k.uninterruptibleGet();

if (reply instanceof ShutdownSignalException) {

    ShutdownSignalException sig = (ShutdownSignalException) reply;

    ShutdownSignalException wrapper =

    new ShutdownSignalException(sig.isHardError(),

                   sig.isInitiatedByApplication(),

                   sig.getReason(),

                   sig.getReference());

    wrapper.initCause(sig);

    throw wrapper;

} else {

    return (byte[]) reply;

}

}


完整描述
  • 创建RpcClient实例:
1,定义一个Map,用于存放每个消息的相关信息:
    private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
    Key是一个correlationId,相当于当前rpcClient实例发送消息的一个计数器,初始化时是0,每发送一个消息时,加1
    Value是一个com.rabbitmq.utility.BlockingCell对象,它是在发送消息前创建,并和当前的correlationId进行关联,放进来
    _continuationMap.put(correlationId, blockingCell);   
2,correlationId初始化为0
3,创建一个回复queue,replyQueue=channel.queueDeclare("", false, false, true, true, null).getQueue();
4,创建一个接收回复消息的consumer
5,指定consumer接收replyQueue上的消息,channel.basicConsume(replyQueue, true, consumer);

  • RpcClient发送消息:
1,创建一个BlockingCell<Object>对象blockingCell
1,correlationId++
2,创建BasicProperties对象,并将correlationId,replyQueue设置到它上面,发送消息时,它会被传递到接收方
3,以correlationId为Key,将blockingCell放入到_continuationMap中
4,发送消息:channel.basicPublish(exchange,  routingKey,  上面 步骤得到的BasicProperties对象,  message);
5,获取回复消息,Object reply = blockingCell.uninterruptibleGet();这里就是同步等待回复消息

  • RpcServer接收消息:
1,接收消息
2,从request中获取BasicProperties对象requestProperties,requestProperties=request.getProperties()
3,从requestProperties中得到correlationId,replyQueue
4,创建一个回复消息用的BasicProperties对象replyProperties,并将correlationId设置到它上面
4,发送回复消息:channel.basicPublish("", replyQueue, replyProperties, replyMessage);

  • RpcClient接收回复:
1,replyQueue一有消息,consumer就会接收到并回调consumer的handleDelivery方法
2,获取传递过来的BasicProperties获取correlationId
3,根据correlationId去continuationMap中取BlockingCell对象,BlockingCell<Object> blocker = continuationMap.get(correlationId);
4,从continuationMap中删除,continuationMap.remove(correlationId);
5,将回复消息设置到blocker对象里面,blocker.set(replyMessage);

  • 同步等待回复消息:
1,【RpcClient发送消息】第4步主线程,发送消息后,第5步就去获取回复消息
2,【RpcClient发送消息】第5步主线程,blockingCell.uninterruptibleGet(),如果blockingCell没有被set(value)过,那么让当前主线程处于等待wait(),等待状态
3,【RpcClient接收回复】第5步blocker.set(replyMessage);这里的blocker其实就是上面主线程创建的blockingCell,因为它是根据correlationId去continuationMap中取的,set(replyMessage),blocker会用一个属性将replyMessage保存起来,供get的时候去返回这个属性,然后调用notify();唤醒处于等待的主线程(当前这步所在的线程和上一步主线程是在两个线程,所以主线程的等待是可以被这个线程唤醒的),主线程被唤醒后,get()就会取到replyMessage,最终整个步骤实现了将异步接收强制转换为同步等待接收

  • BlockingCell类
public class BlockingCell<T> {

    private boolean _filled = false;
    private T _value;
   
    private static final long NANOS_IN_MILLI = 1000 * 1000;
    private static final long INFINITY = -1;

    public BlockingCell() {
    }

    public synchronized T get() throws InterruptedException {
        while (!_filled) {    //如果value没有被设置过
            wait();  //让当前线程处于等待,直到其它线程调用当前对象的notify()或notifyAll()为止
        }
        return _value;
    }
   
    //带超时的get
    public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
        if (timeout < 0 && timeout != INFINITY)
            throw new AssertionError("Timeout cannot be less than zero");
        if (!_filled && timeout != 0) {
            wait(timeout == INFINITY ? 0 : timeout);
        }
        if (!_filled)
            throw new TimeoutException();
        return _value;
    }
   
    //无限制的等待,直到取到值为止
    public synchronized T uninterruptibleGet() {
        while (true) {
            try {
                return get();
            } catch (InterruptedException ex) {
            }
        }
    }
   
    public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
        long now = System.nanoTime() / NANOS_IN_MILLI;
        long runTime = now + timeout;
        do {
            try {
                return get(runTime - now);
            } catch (InterruptedException e) {
            }
        } while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
        throw new TimeoutException();
    }

    public synchronized void set(T newValue) {
        if (_filled) {
            throw new AssertionError("BlockingCell can only be set once");
        }
        _value = newValue;
        _filled = true;
        notify(); //唤醒当前线程(处于等待状态)
    }

    //保证只能被set(value)一次
    public synchronized boolean setIfUnset(T newValue) {
        if (_filled) {
            return false;
        }
        set(newValue);
        _filled = true;
        return true;
    }
}
分享到:
评论

相关推荐

    rabbitmq-c-master.rar_RabbitMQ c lib_cmake编译_rabbitmq_rabbitmq-c

    - 消息发布和消费:发布消息到队列,订阅队列以接收消息。 - AMQP命令:支持完整的AMQP命令集,包括事务、确认模式等。 在使用`rabbitmq-c`时,开发者需要理解AMQP协议的基础知识,包括交换机、队列、绑定和消息...

    rabbitmq-java-client-bin-3.3.4.zip

    在"rabbitmq-java-client-bin-3.3.4.zip"这个压缩包中,包含的是RabbitMQ的Java客户端库,这是与RabbitMQ服务器通信的一个关键组件。RabbitMQ提供了多种语言的客户端,Java客户端则是针对Java开发者设计的,使得Java...

    RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包

    RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...

    rabbitmq-server-3.8.3安装包

    7. **应用集成**:根据开发语言选择对应的RabbitMQ客户端库,如Python的pika,Java的rabbitmq-client等,编写代码进行消息的生产和消费。 在实际使用过程中,还需要了解如何创建交换机、队列,设置路由规则,以及...

    rabbitmq-server-3.5.4.tar.gz

    RabbitMQ提供了一个平台,使得应用程序可以发送和接收消息,而无需两者同时在线。这种异步通信模式有助于提高系统的可扩展性和可靠性。消息通过交换器(Exchanges)分发到绑定(Bindings)的队列(Queues)中,队列...

    rabbitmq-server-mac-standalone-3.5.7.tar.gz

    RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,它提供了一种标准的方式来发送和接收消息。消息在生产者(发布者)和消费者之间通过队列进行中转,确保即使在系统间通信不稳定时也能可靠地传递数据。...

    rabbitmq-server-mac-standalone-3.5.3

    **RabbitMQ for Mac 安装指南** RabbitMQ 是一个开源的消息代理和队列服务器,广泛用于...同时,配合详细的安装手册,可以更好地理解和掌握RabbitMQ的使用,确保在本地开发环境中顺利地进行消息队列的实践和学习。

    rabbitmq-server-3.8.3.exe和erlang22.2.exe 64位压缩文件

    标签中的"rabbitmq-server"指的是RabbitMQ服务端,它是整个消息队列系统的主体部分,负责接收、存储和转发消息。"rabbitmq下载"提示这是一个下载相关的操作,用户可以从这里获取RabbitMQ的安装包。"erlang新版本"则...

    rabbitmq-server-generic-unix-3.5.7.tar.rar下载,rabbitmq安装包

    RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,...请确保按照步骤操作,同时保持对RabbitMQ的学习,理解其核心概念和最佳实践,以便更好地利用这一强大的消息中间件。

    rabbitmq-perf-test-2.7.0-bin.tar.gz

    在进行性能测试时,它能够模拟大量生产者和消费者,通过发送和接收消息来测量系统的性能瓶颈。 在RabbitMQ的性能测试中,有几个重要的概念需要了解: 1. **生产者(Producer)**:生产者是向RabbitMQ发送消息的...

    rabbitmq-java-client-bin-3.3.4

    开发者需要创建一个Connection对象,然后通过这个连接发送和接收消息。 2. **通道(Channel)**:在RabbitMQ中,所有的操作都是通过通道进行的,它是一种轻量级的连接,降低了频繁打开和关闭连接的开销。通过...

    rabbitmq-java (2).zip

    在使用RabbitMQ时,开发者会创建生产者(Producer)来发送消息,以及消费者(Consumer)来接收消息。RabbitMQ提供了多种交换机类型(如Direct、Fanout、Topic、Headers),它们决定了消息如何路由到队列。此外,还...

    rabbitmq-server-3.11.13

    rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-3.11.13rabbitmq-server-...

    RabbitMQ安装包,版本:rabbitmq-server-3.9.11.exe

    rabbitmq-server-3.9.11.exe

    rabbitmq-client-1.3.0.jar

    这个版本的客户端库为开发者提供了丰富的API,以便在Java程序中轻松地发送和接收消息。通过这个库,开发者可以实现生产者(Producer)和消费者(Consumer)的角色,分别用于发布消息到队列和从队列中消费消息。 1. ...

    rabbitmq-server-windows-3.6.10.zip

    9. **高可用性**:通过镜像队列和集群,RabbitMQ可以实现高可用性,确保即使在一个节点失败时,消息仍然可以被正确处理。 10. **插件扩展**:RabbitMQ有强大的插件机制,允许开发者根据需求添加自定义功能。 在...

    rabbitmq-server-3.7.7.exe

    - 客户端库:RabbitMQ支持多种编程语言的客户端库,如Python、Java、JavaScript等,方便在应用程序中发送和接收消息。 在分布式系统中,RabbitMQ常用于实现解耦、异步处理和负载均衡等功能。例如,当一个系统模块...

    RabbitMQ-c源码

    - **回调机制**: 它使用异步回调模型处理事件,如接收消息或连接状态变化。 5. **关键API介绍** - **rabbitmq_connection_init**: 初始化连接结构体,准备建立到RabbitMQ服务器的连接。 - **rabbitmq_channel_...

    rabbitmq-server-generic-unix-2.7.0.tar.gz

    - **Consumer**:消费者是从RabbitMQ接收消息的组件。 - **AMQP协议**:RabbitMQ遵循AMQP协议,它定义了消息的格式和交换过程。 在实际应用中,开发者可以根据需求创建不同类型的交换机(如Direct、Fanout、Topic...

    rabbitmq 3.6.5-1离线安装

    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm systemctl status rabbitmq-server systemctl start rabbitmq-server systemctl stop rabbitmq-server systemctl restart rabbitmq-server systemctl enable ...

Global site tag (gtag.js) - Google Analytics