`

rabbitmq 学习-12- 发送接收消息示例-1

阅读更多

这里是同步发送消息,异步接收消息
接收有两种方式:http://www.rabbitmq.com/api-guide.html#getting

Retrieving individual messages(channel.basicGet)

To retrieve individual messages, use Channel.basicGet. The returned value is an instance of GetResponse, from which the header information (properties) and message body can be extracted

Retrieving messages by subscription(Consumer)

import com.rabbitmq.client.Consumer; import com.rabbitmq.client.QueueingConsumer;

Another way to receive messages is to set up a subscription using the Consumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively.

The easiest and safest supplied implementation of Consumer is the QueueingConsumer convenience class:


TestBase.java -- 基类
/**
 * @author sunjun
 * @createtime 2010-4-27 下午03:13:27
 */
public class TestBase implements ShutdownListener {

    private static Connection connection;

    static {
        ConnectionParameters parameters = new ConnectionParameters();
        parameters.setUsername("guest");
        parameters.setPassword("guest");
        parameters.setVirtualHost("/");
        // 默认情况下使用用户guest/guest,vhost=/
        ConnectionFactory factory = new ConnectionFactory(parameters);
        try {
            // 获取connection
            connection = factory.newConnection("192.168.18.24", AMQP.PROTOCOL.PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected Channel getChannel() throws IOException {
        return connection.createChannel();
    }

    protected void close(Channel channel) {
        try {
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected void printBasicProperties(BasicProperties properties) {
        System.out.println("properties.ContentEncoding: " + properties.getContentEncoding());
        System.out.println("properties.content type: " + properties.getContentType());
        System.out.println("properties.Expiration: " + properties.getExpiration());
        System.out.println("properties.type: " + properties.getType());
        System.out.println("properties.reply to: " + properties.getReplyTo());
        System.out.println("properties.appId: " + properties.getAppId());
        System.out.println("properties.classId: " + properties.getClassId());
        System.out.println("properties.className: " + properties.getClassName());
        System.out.println("properties.clusterId: " + properties.getClusterId());
    }

    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        System.out.println("-----------shutdown information----------");
        Object reason = cause.getReason();
        System.out.println(reason);
        Object reference = cause.getReference();
        System.out.println(reference);
        System.out.println(cause.isHardError());
        System.out.println(cause.isInitiatedByApplication());
    }
}



TestSender.java
/**
 * 发送消息
 *
 * @author sunjun
 * @createtime 2010-4-27 上午11:21:05
 */
public class TestSender extends TestBase {

    /**
     * 使用channel类basicPublish发送
     *
     * @throws IOException
     */
    public void sendMsg() throws IOException {
        // 创建一个channel
        Channel channel = getChannel();

        // 设置return listener,处理不能发送或未被送达的消息,如果发送时设置 强制发送 或 立即发送 ,但是消息没有被接收到,就会回调return listener
        // Handling unroutable or undelivered messages
        // If a message is published with the "mandatory" or "immediate" flags set, but cannot be
        // delivered, the broker will return it to the sending client (via a AMQP.Basic.Return
        // command).
        channel.setReturnListener(new ReturnListener() {

            @Override
            public void handleBasicReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("-----------get exception reply message---------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("ContentEncoding: " + properties.getContentEncoding());
                System.out.println("content type: " + properties.getContentType());
                System.out.println("Expiration: " + properties.getExpiration());
                System.out.println("type: " + properties.getType());
                System.out.println("reply to: " + properties.getReplyTo());
                System.out.println("body: " + new String(body));
            }

        });

        // 定义一个由rabbitmq server自动生成queueName的queue, autodelete, non-durable
        // DeclareOk queueDeclare = channel.queueDeclare();
        // String queue = queueDeclare.getQueue();
        // System.out.println(queue);

        // 定义一个queue,non-autodelete, non-durable(即rabbitmq重启后会消失)
        String queueName = "test.queue";
        DeclareOk queueDeclare = channel.queueDeclare(queueName);

        // 定义一个queue,durable(即rabbitmq重启后也不会消失)
        // String queue = "test.queue";
        // DeclareOk queueDeclare = channel.queueDeclare(queue ,true);
        // qeuue名称,也就是test.queue
        System.out.println("queueName: " + queueDeclare.getQueue());
        // 消费者数量
        System.out.println("ConsumerCount: " + queueDeclare.getConsumerCount());
        // 未读取的消息数量
        System.out.println("MessageCount: " + queueDeclare.getMessageCount());

        // 定义一个exchange
        String exchange = "test.exchange";
        channel.exchangeDeclare(exchange, "direct");

        // bind queue到exchange上
        String routingKey = "test.routingkey";
        channel.queueBind(queueName, exchange, routingKey);

        // 发送一个非持久化文本消息
        String msg = "hello world!";
        // 异步发送消息,不用等待去接收消费者的回复消息
        channel.basicPublish(exchange, routingKey, MessageProperties.TEXT_PLAIN, msg.getBytes());

        System.out.println("send message success.");

        // close
        close(channel);
    }

    /**
     * main
     *
     * @param args
     */
    public static void main(String[] args) {
        try {
            new TestSender().sendMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}



TestReceiver.java
/**
 * 接收消息,使用channel.basicGet接收消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        boolean noAck = true;
        // 获取到消息的计数器
        int counter = 0;
        // 循环从queue(test.queue)中获取消息
        while (true) {
            GetResponse response = channel.basicGet(queue, noAck);
            if (response != null) {
                System.out.println("----------- receive message " + (++counter) + "-----------");
                System.out.println("Message : " + new String(response.getBody()));

                Envelope envelope = response.getEnvelope();
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                long deliveryTag = envelope.getDeliveryTag();

                BasicProperties properties = response.getProps();
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("MessageCount: " + response.getMessageCount());
                System.out.println("deliveryTag: " + deliveryTag);
                printBasicProperties(properties);

                System.out.println("receive success.\n\n");

                if (!noAck) // 发送回复
                    channel.basicAck(deliveryTag, false);
            }
        }
    }

    /**
     * main
     *
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        new TestReceiver().receive();
    }
}



TestReceiver2.java
/**
 * 接收消息,使用Consumer(QueueingConsumer)来取消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver2 extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        int counter = 0;
        boolean noAck = false;

        // 定义一个消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        String consumerTag = channel.basicConsume(queue, noAck,
                queueingConsumer);

        System.out.println("receive message started.");

        // 循环取消息
        while (true) {
            Delivery nextDelivery = null;
            try {
                // 取消息
                nextDelivery = queueingConsumer.nextDelivery();
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (nextDelivery != null) {
                System.out.println("----------- receive message " + (++counter)
                        + "-----------");
                System.out.println("Message : "
                        + new String(nextDelivery.getBody()));

                Envelope envelope = nextDelivery.getEnvelope();
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                long deliveryTag = envelope.getDeliveryTag();

                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("deliveryTag: " + deliveryTag);

                BasicProperties properties = nextDelivery.getProperties();
                printBasicProperties(properties);

                System.out.println("receive success.\n\n");

                if (!noAck)
                    channel.basicAck(deliveryTag, false);

                // channel.basicCancel(queueingConsumer.getConsumerTag());
                // System.out.println("cancel consumer success.");

                break;
            }
        }
    }

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        TestReceiver2 test = new TestReceiver2();
        test.receive();
    }
}

分享到:
评论

相关推荐

    rabbitmq-java-client-bin-3.3.4.zip

    1. **生产者(Producer)**:生产者是向RabbitMQ发送消息的应用程序。在Java客户端中,生产者通过创建一个连接到RabbitMQ服务器的Channel,然后定义交换机(Exchange)和路由键(Routing Key),最后发布消息到指定的...

    rabbitmq-java-client-bin-3.3.4

    6. **发布/消费(Publish/Consume)**:发布者通过交换器将消息发送到队列,消费者则从队列中接收并处理消息。RabbitMQ支持同步和异步消费模式,可以根据应用需求选择合适的方式。 在“rabbitmq-java-client-bin-...

    RabbitMQ实战-随书源码

    随书源码“RabbitMQ实战-随书源码”和“RabbitMQ in action 和 RabbitMQ实战-高效部署分布式消息队列”的配套代码,提供了丰富的示例,帮助读者深入理解RabbitMQ的使用和实践。 1. **RabbitMQ基础** - **AMQP协议*...

    MQ示例+otp_win64_22.2.exe+rabbitmq-server-3.8.1

    用户解压后,可以查看其中的代码、配置文件或其他文档,以了解如何设置和操作 RabbitMQ 服务器,或者如何编写客户端应用程序来发送和接收消息。 综合以上信息,我们可以知道这个压缩包旨在帮助用户学习和使用 ...

    rabbitmq-dotnet-client-3.6.4-dotnet-4.6.1.rar

    这个压缩包提供了一个演示如何在WCF(Windows Communication Foundation)服务中使用RabbitMQ进行消息传递的示例。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中的异步处理、解耦和负载均衡。 ...

    rabbitmq-server-3.7.14.zip

    - **交换机(Exchange)**:交换机负责接收生产者发送的消息,并根据预定义的路由规则将其分发到相应的队列。 - **队列(Queue)**:队列是存储消息的地方,它是持久化的,可以有多个消费者同时监听。 - **绑定...

    rabbitmq学习10:使用spring-amqp发送消息及异步接收消息

    标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...

    Windows c++中RabbitMQ-c特殊消息队列的保姆教程(csdn)————程序.pdf

    - 建立通道(channel),这是RabbitMQ中发送和接收消息的实体。 - 定义交换机(exchange)和队列(queue),决定消息如何路由。 - 发布消息到交换机,RabbitMQ会根据交换机类型和绑定规则将消息放入队列。 - 如果...

    rabbitmq-c.rar

    在实际开发中,你可以创建一个简单的生产者示例,发送一条消息到RabbitMQ服务器,然后创建一个消费者示例来接收并处理该消息。这涉及到了连接建立、通道打开、交换机选择、队列声明、消息发送和消费等一系列步骤。 ...

    rabbitmq-java-client-bin-3.0.4.zip

    这个客户端库允许Java开发者轻松地与RabbitMQ服务器进行交互,发送和接收消息。在"rabbitmq-java-client-3.0.4"目录下,通常会包含JAR文件和其他必要的资源,如文档、示例代码等。开发者可以将这些JAR文件添加到他们...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    1. **Broker**: RabbitMQ服务器,负责接收、存储和转发消息。 2. **Exchange**: 交换机,根据预设的路由规则,将消息分发到相应的队列。 3. **Queue**: 队列,消息的容器,遵循先进先出(FIFO)原则。 4. **Binding*...

    rabbitmq-c rabbitmq amqp c++接口库

    9. **示例代码**:使用`rabbitmq-c++`,开发者可以编写简洁的C++代码来发送和接收消息,例如: ```cpp RabbitMQConnection conn("localhost", 5672, "guest", "guest"); RabbitMQChannel channel(conn); channel....

    RabbitMQ入门-实战-RabbitMQ.zip

    **RabbitMQ 入门与实战** RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息...在 RabbitMQ-master 文件中,可能包含了示例代码和教程,可以帮助你进一步学习和掌握 RabbitMQ 的使用。

    rabbitmq-server-2.8.5.tar.gz

    2. **AMQP 0-9-1支持**:该版本完全支持AMQP 0-9-1,这是一个广泛接受的消息协议,使得与其他语言和系统的集成变得简单。 3. **多种插件**:RabbitMQ 2.8.5提供了一系列插件,如管理界面、监控工具等,便于用户管理...

    RabbitMQ发送和接收

    本文将深入探讨C#环境下如何利用RabbitMQ实现消息的发送与接收,以及相关的关键概念和技术。 首先,我们要理解RabbitMQ的基本工作原理。RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,它定义了消息...

    RabbitMQ-Day1-Code

    RabbitMQ-Day1-Code 是一个学习RabbitMQ基础的项目,包含了入门第一天的所有编程示例,主要分为两个部分:`rabbitmq-producer` 和 `rabbitmq-consumer`。这两个部分分别对应了消息队列中的生产者和消费者角色。 ...

    android上RabbitMQ发送简单例子

    在这个示例中,我们创建了一个名为`test_queue`的队列,并发送了一条消息"Hello, RabbitMQ!"。注意,实际应用中需要替换默认的guest用户和端口,因为它们只适用于本地开发。 最后,"ZZB_net01"可能是一个包含...

    rabbitmq-c官方源代码,内涵examples,tools,tests

    1. **examples** - 这个目录包含了各种示例程序,展示了如何使用RabbitMQ-C库来实现基本的消息传递功能。这些例子涵盖了创建连接、打开通道、声明交换机、绑定队列、发送和接收消息等操作,是初学者快速上手的好材料...

    rabbitmq-server-3.8.3.zip

    2. **交换机(Exchange)**:负责接收生产者发送的消息,并根据预设的路由规则将其分发到不同的队列中。 3. **队列(Queue)**:存储消息的容器,多个消费者可以同时从一个队列中获取消息,但每个消息只会被一个消费者...

    RabbitMQ实战演练-RabbitMQ-Action.zip

    1. **Broker**: RabbitMQ服务器本身,负责接收、存储和转发消息。 2. **Exchange**: 交换器是RabbitMQ的核心组件,它根据预定义的路由规则将消息分发到不同的队列。 3. **Queue**: 队列是消息的实际存储位置,消费者...

Global site tag (gtag.js) - Google Analytics