`

rabbitmq 学习-13- 发送接收消息示例-2

阅读更多

Basic RPC

As a programming convenience, the Java client API offers a class RpcClient which uses a temporary reply queue to provide simple RPC-style communication facilities via AMQP.

The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.

实现Rpc形式的发送与接收消息

/**
 * 使用RpcClient发送消息
 *
 * @author sunjun
 * @createtime 2010-4-27 上午11:21:05
 */
public class TestSender2 extends TestBase {

    /**
     * 发送一个消息
     *
     * @throws IOException
     */
    public void sendMsg() throws IOException {
        // 创建一个channel
        Channel channel = getChannel();

        // 设置return listener,用于监听回复(发生异常时)
        channel.setReturnListener(new ReturnListener() {

            @Override
            public void handleBasicReturn(int replyCode, String replyText,
                    String exchange, String routingKey,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out
                        .println("-----------get exception reply message---------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("ContentEncoding: "
                        + properties.getContentEncoding());
                System.out.println("content type: "
                        + properties.getContentType());
                System.out.println("Expiration: " + properties.getExpiration());
                System.out.println("type: " + properties.getType());
                System.out.println("reply to: " + properties.getReplyTo());
                System.out.println("body: " + new String(body));
            }

        });

        // 定义一个queue
        String queue = "test.queue";
        channel.queueDeclare(queue);

        // 定义一个exchange
        String exchange = "test.exchange";
        channel.exchangeDeclare(exchange, "direct");

        // bind queue到exchange上
        String routingKey = "test.routingkey";
        channel.queueBind(queue, exchange, routingKey);

        RpcClient rpcClient = new RpcClient(channel, exchange, routingKey);
        try {
            // 发送消息
            String msg = "hello world!";
            byte[] result = rpcClient.primitiveCall(msg.getBytes());
            System.out.println(new String(result));
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        }

        System.out.println("send message success.");

        // close
        // close(channel);
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        TestSender2 testSender = new TestSender2();
        try {
            testSender.sendMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

 


 

/**
 * 使用RpcServer接收消息,并发送回复消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver3 extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        System.out.println("receive message started.");

        // 循环取消息
        while (true) {
            RpcServer rpcServer = new RpcServer(channel, queue) {
                @Override
                public byte[] handleCall(Delivery request,
                        BasicProperties replyProperties) {
                    // 接收消息
                    byte[] message = request.getBody();
                    // 发送返回消息
                    return super.handleCall(request, replyProperties);
                }
            };
        }

    }

    public static void main(String[] args) throws IOException {
        new TestReceiver3().receive();
    }
}

分享到:
评论

相关推荐

    rabbitmq-java-client-bin-3.3.4.zip

    1. **生产者(Producer)**:生产者是向RabbitMQ发送消息的应用程序。在Java客户端中,生产者通过创建一个连接到RabbitMQ服务器的Channel,然后定义交换机(Exchange)和路由键(Routing Key),最后发布消息到指定的...

    rabbitmq-java-client-bin-3.3.4

    6. **发布/消费(Publish/Consume)**:发布者通过交换器将消息发送到队列,消费者则从队列中接收并处理消息。RabbitMQ支持同步和异步消费模式,可以根据应用需求选择合适的方式。 在“rabbitmq-java-client-bin-...

    RabbitMQ实战-随书源码

    随书源码“RabbitMQ实战-随书源码”和“RabbitMQ in action 和 RabbitMQ实战-高效部署分布式消息队列”的配套代码,提供了丰富的示例,帮助读者深入理解RabbitMQ的使用和实践。 1. **RabbitMQ基础** - **AMQP协议*...

    MQ示例+otp_win64_22.2.exe+rabbitmq-server-3.8.1

    用户解压后,可以查看其中的代码、配置文件或其他文档,以了解如何设置和操作 RabbitMQ 服务器,或者如何编写客户端应用程序来发送和接收消息。 综合以上信息,我们可以知道这个压缩包旨在帮助用户学习和使用 ...

    rabbitmq-dotnet-client-3.6.4-dotnet-4.6.1.rar

    2. **创建通道**:通过`IConnection`对象的`CreateModel()`方法创建一个新的通道,通道是AMQP中的工作单元,用于发送和接收消息。 3. **声明交换机和队列**:使用`channel.ExchangeDeclare()`和`channel....

    rabbitmq-server-3.7.14.zip

    - **交换机(Exchange)**:交换机负责接收生产者发送的消息,并根据预定义的路由规则将其分发到相应的队列。 - **队列(Queue)**:队列是存储消息的地方,它是持久化的,可以有多个消费者同时监听。 - **绑定...

    rabbitmq学习10:使用spring-amqp发送消息及异步接收消息

    标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...

    Windows c++中RabbitMQ-c特殊消息队列的保姆教程(csdn)————程序.pdf

    - 建立通道(channel),这是RabbitMQ中发送和接收消息的实体。 - 定义交换机(exchange)和队列(queue),决定消息如何路由。 - 发布消息到交换机,RabbitMQ会根据交换机类型和绑定规则将消息放入队列。 - 如果...

    rabbitmq-c.rar

    在实际开发中,你可以创建一个简单的生产者示例,发送一条消息到RabbitMQ服务器,然后创建一个消费者示例来接收并处理该消息。这涉及到了连接建立、通道打开、交换机选择、队列声明、消息发送和消费等一系列步骤。 ...

    rabbitmq-java-client-bin-3.0.4.zip

    这个客户端库允许Java开发者轻松地与RabbitMQ服务器进行交互,发送和接收消息。在"rabbitmq-java-client-3.0.4"目录下,通常会包含JAR文件和其他必要的资源,如文档、示例代码等。开发者可以将这些JAR文件添加到他们...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    本文将围绕RabbitMQ的Java实践,通过分析"rabbitMQ-demo.zip"中的示例项目"rabbitMQ-demo-main",深入讲解RabbitMQ的核心概念、工作模式以及如何在Java环境中进行集成和应用。 一、RabbitMQ核心概念 1. **Broker**...

    rabbitmq-c rabbitmq amqp c++接口库

    9. **示例代码**:使用`rabbitmq-c++`,开发者可以编写简洁的C++代码来发送和接收消息,例如: ```cpp RabbitMQConnection conn("localhost", 5672, "guest", "guest"); RabbitMQChannel channel(conn); channel....

    RabbitMQ入门-实战-RabbitMQ.zip

    2. **交换器(Exchange)**:负责接收生产者发送的消息,并根据预定义的路由规则将消息分发到相应的队列。 3. **队列(Queue)**:存储消息的容器,可以理解为消息的缓冲区,多个消费者可以共享同一个队列。 4. **...

    rabbitmq-server-2.8.5.tar.gz

    2. **发布与消费消息**:生产者使用`publish`方法发布消息到交换器,消费者通过`consume`方法订阅队列并接收消息。 3. **交换器类型**:包括Direct、Fanout、Topic和Headers四种,根据业务需求选择合适的类型进行...

    RabbitMQ发送和接收

    本文将深入探讨C#环境下如何利用RabbitMQ实现消息的发送与接收,以及相关的关键概念和技术。 首先,我们要理解RabbitMQ的基本工作原理。RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,它定义了消息...

    android上RabbitMQ发送简单例子

    在这个示例中,我们创建了一个名为`test_queue`的队列,并发送了一条消息"Hello, RabbitMQ!"。注意,实际应用中需要替换默认的guest用户和端口,因为它们只适用于本地开发。 最后,"ZZB_net01"可能是一个包含...

    rabbitmq-c官方源代码,内涵examples,tools,tests

    **RabbitMQ-C:C语言实现的RabbitMQ客户端库** RabbitMQ-C是一个开源的、用C语言编写的RabbitMQ客户端库...通过源代码分析、示例学习和测试运行,开发者可以深入理解RabbitMQ-C的工作原理,并在自己的项目中灵活运用。

    rabbitmq-server-3.8.3.zip

    2. **交换机(Exchange)**:负责接收生产者发送的消息,并根据预设的路由规则将其分发到不同的队列中。 3. **队列(Queue)**:存储消息的容器,多个消费者可以同时从一个队列中获取消息,但每个消息只会被一个消费者...

    RabbitMQ-Day1-Code

    这个目录下的代码可能包含了如何连接到RabbitMQ服务器,创建通道(Channel),定义交换机(Exchange)和队列(Queue),并发送消息的示例。交换机是RabbitMQ中的一种路由机制,它根据预定义的规则(Binding)将消息...

    RabbitMQ实战演练-RabbitMQ-Action.zip

    RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于处理异步任务、解耦组件以及实现消息传递。本实战演练将带你深入理解RabbitMQ的核心概念...

Global site tag (gtag.js) - Google Analytics