`
zhchx0827
  • 浏览: 194049 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

RabbitMQ入门学习——Publish/Subscribe 发布/订阅

 
阅读更多

 

http://www.rabbitmq.com/tutorials/tutorial-three-java.html


                   
 

在前面的章节中,我们创建了一个工作队列。工作队列假设每一个任务都交给一个工作者来处理。在这一章节,我们会处理一些完全不同的事情,我们会将消息发送给多个消费者。这种模式就叫做“publish/subscribe

为了说明这种模式,我们会建立一个简单的日志系统。它包括两个程序:一个发送日志,另一个接受然后将他们打印出来。

在我们的日志系统中,每一个运行这的接受者,都将会收到消息。通过这种方式,我们可以在同一时间使一个接受者将消息保存到硬盘上,另一个接受者将消息打印到控制台。

本质上,被发布的消息将会广播到每一个接受者上

1Exchanges(交换区)

在前面的章节中,我们都是将消息写到队列中,然后从队列中取出消息。现在是时候引入完整的Rabbit消息模型了

让我们回顾下前面章节都包括那些内容

一个producer发送消息的应用

一个queue缓存消息

一个consumer接受消息的应用

RabbitMQ消息模型的核心思想就是,生产者不把消息直接发送给队列。实际上,生产者在很多情况下都不知道消息是否会被发送到一个队列中。取而代之的是,生产者将消息发送到交换区。交换区是一个非常简单的东西,它一端接受生产者的消息,另一端将他们推送到队列中。交换区必须要明确的指导如何处理它接受到的消息。是放到一个队列中,还是放到多个队列中,亦或是被丢弃。这些规则可以通过交换区的类型来定义。

可用的交换区类型有:directtopicheadersfanout。下面将集中讨论最后一个:fanout。可以通过channel.exchangeDeclare(logs,fanout)来创建一个这种类型的交换区。它会把它接收到的所有消息,发送给所有和它绑定的队列。并且,这正是我们日志需要的。

可以通过客户端的rabbitmqctl list_exchanges查看所有的交换区可以通过rabbitmqctl list_exchanges列出所有的交换区。amp.*表示的是默认或没有命名的交换区。在前面的章节中,我们不知道交换区,但是我们还是可以发送消息到队列中。那是因为我们使用了默认的交换区,我们通过控制符串来制定。回想之前发布消息的代码channel.basicPublish(“”,hello,null,message.getBytes());第一个参数就是交换区的名字。空字符串表示这是默认的或者没有命名的交换区。消息通过指定的路由名routingkey被路由到队列中。

    现在我们可以将消息发送到交换区

channel.basicPublish( "logs", "", null, message.getBytes());

 

2Temporary queues 临时队列

之前使用的队列中有一个名字,给定一个名字对我们来说非常的重要,因为我们需要将消费者关联到队列。当我们希望在生产者和消费者之间共享队列时,指定队列的名字将变的非常重要。

但是,这不适用于我们的日志记录。我们希望记录所有的日志消息,而不是他们的一个子集。我们同时也对当前的信息感兴趣而不是老得信息。要解决这些问题,我们需要做如下两件事情:

首先:当我们连接到RabbitMQ时,我们需要一个新的,空的队列。为了解决这个问题,我们可以使用一个随机创建的名字;或者让RabbitMQ服务器随机帮我们创建一个队列名字。

其次:一旦我们断开连接,需要自动删除消费者队列

我们可以通过channel.queueDeclare()来创建一个非持久的队列名,并通过getQueue()来返回对列名。

3: Bindings(绑定)

我们已经创建了一个fanout交换区和一个队列,现在我们就可以让交换区来发送消息到我们的队列中来。交换区和队列的关系,可以通过调用channel.queueBind(queueName,logs,””);来绑定。

现在,所有的日志交换区将会将数据追加到我们的队列中

通过客户端Rabbitmqctl list_bindings命令,可以查看所有的绑定

4Putting it all together

发送消息的生产者程序,和之前的代码看起来没有多大的区别。最主要的变化就是,我们现在希望将消息发送给我们的logs交换区,而不是之前的默认交换区。发送时,我们需要提供一个routingkey路由名,它的值对于fanout类型的交换区来说,将会被忽略。下面是EmitLog.java的代码片段

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

 

public class EmitLog {

 

    private static final String EXCHANGE_NAME = "logs";

 

    public static void main(String[] argv)

                  throws java.io.IOException {

 

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

 

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

 

        String message = getMessage(argv);

 

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        System.out.println(" [x] Sent '" + message + "'");

 

        channel.close();

        connection.close();

    }

    //...

}

正如你所看到的,在建立到RabbitMQ的连接之后,我们就申明了一个交换区。这一步非常重要,因为发布消息到一个不存在的交换区是不被允许的。如果没有队列绑定到交换区,消息将会丢失。

ReceiveLogs.java

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.QueueingConsumer;

 

public class ReceiveLogs {

 

    private static final String EXCHANGE_NAME = "logs";

 

    public static void main(String[] argv)

                  throws java.io.IOException,

                  java.lang.InterruptedException {

 

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

 

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "");

 

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

 

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, consumer);

 

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            String message = new String(delivery.getBody());

 

            System.out.println(" [x] Received '" + message + "'");

        }

    }

}

使用rabbitmqctl list_bindings命令,你可以确认程序却是创建了bindingsqueue

  • 大小: 4.7 KB
分享到:
评论

相关推荐

    Sringboot整合RabbitMQ(三):发布订阅(Publish/Subscribe)

    这种模式被称为” 发布 / 订阅”。 交换器(Exchanges) 在本教程的前面部分,我们发送和接收到队列中的消息,现在是时候在 RabbitMQ 中引入完整的消息传递模式了。 让我们快速回顾一下之前了解的内容: 生产者...

    RabbitMQ练习(Publish Subscribe)

    RabbitMQ练习(Publish Subscribe)

    rabbitmq 7种队列实现java版

    文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...

    rabbitmq简单demo

    在Publish/Subscribe模式中,生产者发布消息到一个特定的交换机,而消费者订阅一个或多个主题(即绑定到交换机)。交换机会根据预定义的路由键将消息路由到相应的队列。这种方式允许广播消息到多个消费者,每个消费...

    RabbitMQ入门操作手册.pdf

    3. **Publish/Subscribe模式**:发布者发送消息到主题,订阅者根据主题订阅消息。 4. **Routing模式**:消息基于路由键进行分发。 5. **Topics模式**:类似Routing,但路由键可以包含通配符,实现更灵活的匹配。 6. ...

    资源前后端分离式分布式微服务架构项目消息中间件RabbitMQ讲义+源码+视频

    页面发布需求分析 理解 理解Cms页面发布的流程 RabbitMQ介绍 理解 能够说出MQ的应用场景 RabbitMQ工作原理 理解 理解RabbitMQ的工作原理 "能够说出RabbitMQ基础结构组成部分能够完成RabbitMQ下载和安装能够完成...

    rabbitmq发布订阅

    【标题】:“rabbitmq发布订阅”是分布式消息传递中的一个重要概念,它允许生产者发送消息到RabbitMQ服务器,而多个消费者可以订阅这些消息并进行处理。RabbitMQ是一个开源的消息代理和队列服务器,使用AMQP...

    rabbitMQ订阅收发.zip

    在RabbitMQ中,订阅模式也称为发布/订阅模式(Publish/Subscribe)。在这种模式下,生产者发送消息到一个特定的交换机,该交换机会将消息路由到多个消费者,而无需知道消费者的任何信息。这种模式常用于广播或者一对...

    RabbitMQ源码学习资料

    4. **消息模型**:RabbitMQ采用发布/订阅(Publish/Subscribe)、路由(Routing)、主题(Topic)和直接(Direct)四种交换器类型,根据不同的路由策略将消息分发到相应的队列。队列是消息的存储单元,它接收来自生产者的...

    RabbitMQClient_订阅测试.zip_rabbitmq

    在实际使用中,RabbitMQ的订阅模式是基于发布/订阅(Publish/Subscribe)模型,其中生产者发送消息到一个交换机,然后由交换机根据预定义的路由规则将消息分发到一个或多个队列。消费者订阅这些队列,当有新消息到达...

    kettle rabbitmq 插件开发

    5. **发送数据到 RabbitMQ**:开发一个 Kettle 步骤(Step)或转换来将数据转换为适合 RabbitMQ 的格式,并通过 RabbitMQ 的 Publish/Subscribe 或 Direct 模式发送消息。 6. **从 RabbitMQ 接收数据**:创建对应的...

    RabbitMQ学习案例Demo

    1. **发布/订阅模式(Publish/Subscribe)** 在这种模式下,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅该主题,接收到所有发布到该主题的消息。这种模式适用于一对多的...

    springboot-mqtt简易demo

    MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。 该协议将...

    Java课程实验 RabbitMQ 常用的工作模式

    3. 发布/订阅模式(Publish/Subscribe Mode): - 发布/订阅模式用于将消息广播到多个消费者。每个消费者都有自己的队列,并且订阅相同的交换机。生产者发送消息到交换机,然后交换机将消息广播到所有与之绑定的...

    RabbitMQ_MVC_Demo.rar

    本例为.net下使用EasyNetQ操作RabbitMQ的Demo,例子采用MVC架构,包含完整的Publish/Subscribe。详情请看:https://www.cnblogs.com/imstrive/p/11078335.html

    RabbitMQDemo_RabbitMQ发布与订阅Demo_rabbitmq_JSON_中间件_

    在发布(Publish)过程中,生产者通过RabbitMQ客户端库连接到服务器,创建一个通道(Channel),然后定义一个交换机,并将消息绑定到这个交换机上。消息内容通常包含JSON数据,这种格式易于读写,且支持复杂的数据...

    rabbitmqdemo

    除了基本的发送和接收消息之外,RabbitMQ 还支持许多高级特性,如工作队列(Work Queues)、发布/订阅模式(Publish/Subscribe)、主题路由(Topic Routing)等。在 "rabbitmqdemo" 中,你可能能够发现这些模式的...

    SpringBoot-RabbitMQ生产者和消费者.7z

    2. **Publish/Subscribe模式**:在这种模式下,生产者发布消息到一个主题,而多个消费者可以订阅该主题。每个消费者都可以接收到所有发布到该主题的消息,实现一对多的消息广播。 3. **Topic模式**:这是介于Direct...

    RabbitMQ-最完整最全教程

    RabbitMQ提供了六种工作模式:简单模式、work模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式和RPC远程调用模式。 RabbitMQ的安装及配置包括了安装说明和用户以及VirtualHosts的配置。...

    RabbitMQ实战带目录版本

    2. **发布/订阅(Publish/Subscribe)**:生产者发布消息到主题交换器,消费者订阅特定的主题。所有匹配主题的消息都会被分发到订阅的消费者。 3. **路由(Routing)**:通过路由键将消息路由到指定的队列,适用...

Global site tag (gtag.js) - Google Analytics