`
yugouai
  • 浏览: 494979 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

rabbitmq入门-发布与订阅

 
阅读更多

发布/订阅:分发一个消息给多个消费者(consumers)接收一个生产者生产的消息

 

交换器(Exchanges)

rabbitmq完整的消息模型

 

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

  RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换器(exchange)。交换器非常简单,它一边从发布者方接收消息,一边把消息推入队列。交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。

 

有几个可供选择的交换器类型:directtopicheaders和 fanout

 

创建一个fanout类型的交换器,命名为logs

 

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

 fanout交换器很简单,从名字上就能猜测出来,它把消息发送给它所知道的所有队列。

 

 

 

交换器列表

rabbitmqctl能够列出服务器上所有的交换器:

rabbitmqctl list_exchanges

Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
...done.

 这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

 

匿名的交换器

 

我们对交换器一无所知,但仍然能够发送消息到队列中。因为使用了命名为空字符串("")默认的交换器。

 

之前是如何发布一则消息:

 

 

channel.basicPublish("", "hello", null, message.getBytes());

 

 

 

exchange参数就是交换器的名称。空字符串代表默认或者匿名交换器:消息将会根据指定的routing_key分发到指定的队列。

 

发送消息到一个具名交换器了:

 

channel.basicPublish( "logs", "", null, message.getBytes());

 

 

临时队列

一个消息同时分发给多个消费者,需要指定exchange名称,另外,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只要在调用queue_declare方法的时候,不提供queue参数就可以了:

 

String queueName = channel.queueDeclare().getQueue();

 

 

绑定

 

已经创建了一个fanout类型的交换器和一个队列。现在我们需要告诉交换器如何发送消息给我们的队列。交换器和队列之间的关系我们称之为绑定(binding)

 

channel.queueBind(queueName, "logs", "");

 logs交换器将会把消息添加到我们的队列中logs交换器将会把消息添加到我们的队列中

 

使用rabbitmqctl list_bindings队列出所有存在的绑定

 

生产者代码

public class EmitLog {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws java.io.IOException {

		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		String message = getMessage(argv);

		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");

		channel.close();
		connection.close();
	}

	private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }
    
    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

 消费者代码

public class ReceiveLogs {
	private final static String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
	}
}

 查看banding的exchange

Listing bindings ...
        exchange        amq.gen-zKK_8tEJV79W5r1twM87lg  queue   amq.gen-zKK_8
V79W5r1twM87lg  []
        exchange        hello   queue   hello   []
logs    exchange        amq.gen-zKK_8tEJV79W5r1twM87lg  queue           []
...done.

 

 

分享到:
评论

相关推荐

    RabbitMQ入门-实战-RabbitMQ.zip

    **RabbitMQ 入门与实战** RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息队列系统,它被广泛应用于分布式系统中的消息传递和任务调度。RabbitMQ 提供了高可用性、可扩展性和可靠性...

    5-2 RabbitMQ入门 - EMOS小程序1

    1. docker load &lt; rabbitmq.tar.gz 1. 简单模式 3. 发布/订阅模式 4. 路由模式 5. 主题模式

    RabbitMQ入门小Dome ------&amp;gt; RabbitMQDome.zip

    最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...

    RabbitMQ-Day1-Code

    RabbitMQ-Day1-Code 是一个学习RabbitMQ基础的项目,包含了入门第一天的所有编程示例,主要分为两个部分:`rabbitmq-producer` 和 `rabbitmq-consumer`。这两个部分分别对应了消息队列中的生产者和消费者角色。 ...

    RabbitMQ-Pub-Sub-Sample:RabbitMQ入门

    在这个“RabbitMQ-Pub-Sub-Sample”项目中,我们将深入探讨RabbitMQ的发布/订阅模式以及如何在C#环境中进行集成和测试。 1. **RabbitMQ基本概念**: - **节点(Node)**:RabbitMQ服务器的实例。 - **交换机...

    rabbitMQ代码案例 简单入门

    3. **消息发布/订阅模式**:理解生产者如何发布消息到交换器,以及消费者如何订阅队列来获取消息。 4. **工作队列模式**:学习如何使用RabbitMQ实现任务调度,通过多个消费者并行处理任务以提高效率。 5. **路由与...

    20.消息中间件之RabbitMQ入门讲解

    消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心概念,如何通过控制台进行管理,以及如何在Spring Cloud框架下创建消息生产者和消费者。 首先,让我们了解RabbitMQ的基本概念。RabbitMQ的...

    RabbitMQ从入门到放弃

    ### RabbitMQ从入门到放弃——理解消息队列与RabbitMQ #### 消息队列简介 消息队列(Message Queue, MQ)作为一种重要的中间件技术,它提供了应用程序间的一种通信方式,通过写入和读取出入列队的消息来进行通信,...

    Rabbitmq笔记+入门教程+示例

    ### RabbitMQ基础知识与应用 #### 一、RabbitMQ简介 RabbitMQ 是一款开源的消息代理软件,也是 AMQP(Advanced Message Queuing Protocol)标准的一个实现。它支持多种消息传递模式,包括点对点(Direct)、发布/...

    RabbitMQ 入门教程(JAVA)

    ### RabbitMQ 入门教程(JAVA) #### 一、RabbitMQ 概述 RabbitMQ 是一个消息中间件,其主要功能是接收...接下来,你可以继续探索更多高级特性,如持久化消息、发布/订阅模式、事务处理等,以满足更复杂的业务需求。

    RabbitMQ.pdf-详情

    3. **发布/订阅**:多个生产者和消费者,每个消费者可以订阅多个主题,广播模式。 4. **路由**:根据路由键将消息发送给匹配的消费者。 5. **主题**:类似于路由,但路由键可以是模式,允许更灵活的匹配规则。 6. **...

    RabbitMQ入门操作手册.pdf

    【RabbitMQ入门操作手册】提供了全面的RabbitMQ学习指南,从基础概念到实际操作,帮助初学者快速掌握这个强大的消息队列系统。RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理,其核心...

    RabbitMQ入门教程.docx

    ### RabbitMQ入门知识点详解 #### 一、RabbitMQ简介 RabbitMQ是一款开源的消息中间件,基于Erlang语言开发而成。它支持多种消息发布/订阅模式,并且能够跨多平台运行。作为消息中间件,RabbitMQ的核心功能是接受、...

    RabbitMQ中文文档-.rar

    "发布/订阅"模式在RabbitMQ中是另一种重要的通信方式。在这个模式下,生产者发布消息到一个主题,而多个订阅者可以监听该主题并接收到消息。这种一对多的关系使得发布/订阅模式适用于广播或通知场景。 "路由"章节...

    rabbitmq-tutorial-php-demo:RabbitMQ官方中文入门教程(PHP版)演示源码-php

    "rabbitmq-tutorial-php-demo" 指的是一个基于PHP的RabbitMQ入门教程的演示项目。RabbitMQ是一个流行的开源消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议来实现高效、可靠的消息传递...

    RabbitMQ入门代码

    在这个“RabbitMQ入门代码”中,我们将深入探讨如何使用Java来与RabbitMQ进行交互,包括队列持久化、消息持久化、Direct交换机、Fanout交换机和Topic交换机的基础测试代码。 首先,让我们从基础开始,了解如何在...

    RabbitMQ_Project.zip

    通常,开发者会使用RabbitMQ的Java客户端库(`com.rabbitmq:amqp-client`)来创建连接、通道、发布/订阅消息等。`pom.xml`文件是Maven项目的配置文件,用于管理依赖项。`src`目录下应该包含了Java源代码,`target`...

    第一节-rabbitmq安装以及入门.pdf

    RabbitMQ是一种开源的消息代理和队列服务器,它允许不同的应用程序之间通过AMQP协议...掌握了这些基础知识点之后,用户可以进一步深入学习RabbitMQ的高级特性,如发布订阅模式、消息确认机制、死信队列和消息持久化等。

    RabbitMq开发入门.docx

    RabbitMQ 开发入门 RabbitMQ 是一个基于 AMQP 协议的消息队列中间件,广泛应用于分布式系统中实现异步通信、解耦合、流量控制等功能。以下是 RabbitMQ 开发入门的一些重要知识点: 1. 连接配置 在使用 RabbitMQ ...

    RabbitMQ入门视频

    5. **发布/订阅模式**:这是RabbitMQ中的一种常见模式,生产者发布消息到一个交换机,而多个消费者可以订阅这些消息。这种模式常用于广播消息或者需要多点接收的情况。 6. **工作队列模式**:在工作队列模式下,...

Global site tag (gtag.js) - Google Analytics