`

基于 RabbitMQ 的消息发布与订阅

阅读更多

 pom.xml

了解springcloud架构可以加求求:三五三六二四七二五九

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>4.1.0</version>
</dependency>

 RabbitMQ 连接配置工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ChannelUtils {

    public static Channel getChannelInstance(String connectionDescription) {
        try {
            ConnectionFactory connectionFactory = getConnectionFactory();
            // 创建连接
            Connection connection = connectionFactory.newConnection(connectionDescription);
            return connection.createChannel();
        } catch (Exception e) {
            throw new RuntimeException("获取Channel连接失败");
        }
    }

    /**
     * 功能描述:
     * <连接配置>
     *
     *
     * @return com.rabbitmq.client.ConnectionFactory
     * @author zhoulipu
     * @date   2019/7/31 18:02
     */
    private static ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 配置连接信息
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 失败重连模式:网络异常自动重新连接
        connectionFactory.setAutomaticRecoveryEnabled(true);
        // 失败重连模式:每10秒重试连接
        connectionFactory.setNetworkRecoveryInterval(10000);
        // 失败重连模式:重新声明交换器,队列等信息
        connectionFactory.setTopologyRecoveryEnabled(true);
        return connectionFactory;
    }

}

 

生产者发布消息

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Scanner;

public class Producer {

    /**
     * 交换机名称(需与消费者保持一致)
     */
    private static final String pbxName = "PBX";

    /**
     * 路由密匙(需与消费者保持一致)
     */
    private static final String routingKey = "routingKey";

    public static void main(String[] args) throws IOException {
        // 获取连接
        Channel channel = ChannelUtils.getChannelInstance("队列消息生产者");
        // 生产者配置
        AMQP.BasicProperties basicProperties = producerConfig(channel, pbxName);
        while (true) {
            System.out.print("请输入即将发布的消息:");
            Scanner str = new Scanner(System.in);
            // 发布消息
            publish(channel, basicProperties, pbxName, routingKey, str.next());
            System.out.println("消息已发布,请查看订阅者状态");
        }
    }

    /**
     * 功能描述:
     * <生产者发布消息>
     *
     * @param channel         1
     * @param basicProperties 2
     * @param pbxName         3
     * @param routingKey      4
     * @param msg             5
     * @return void
     * @author zhoulipu
     * @date 2019/7/31 17:52
     */
    public static void publish(Channel channel, AMQP.BasicProperties basicProperties, String pbxName, String routingKey, String msg) throws IOException {
        // 发布消息(交换机名, 路由关键字, 是否为强制性, 消息属性, 消息体);
        channel.basicPublish(pbxName, routingKey, false, basicProperties, msg.getBytes());
    }

    /**
     * 功能描述:
     * <生产者配置>
     *
     * @param channel 1
     * @param pbxName 2
     * @return com.rabbitmq.client.AMQP.BasicProperties
     * @author zhoulipu
     * @date 2019/7/31 17:52
     */
    public static AMQP.BasicProperties producerConfig(Channel channel, String pbxName) throws IOException {
        // 声明交换机 (交换机名称, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机(客户机直接发送消息), 队列中的消息自动删除配置);
        channel.exchangeDeclare(pbxName, BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
        // 消息属性
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                .deliveryMode(2) // 设置消息是否持久化,1: 非持久化 2:持久化
                .contentType("UTF-8")
                .contentEncoding("UTF-8")
                .headers(new HashMap<>())
                .build();
        return basicProperties;
    }

}

 

消费者订阅消息

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class Consumer {

    /**
     * 交换机名称(需与消费者保持一致)
     */
    private static final String pbxName = "PBX";

    /**
     * 路由密匙(需与消费者保持一致)
     */
    private static final String routingKey = "routingKey";


    public static void main(String[] args) throws IOException {
        // 队列名称(多个消费者需使用不同队列名称)
        String queueName = "queueName";
        // 获取连接
        Channel channel = ChannelUtils.getChannelInstance("队列消息消费者");
        // 消费者配置(交换机、路由、队列)
        AMQP.Queue.DeclareOk declareOk = consumerConfig(channel, pbxName, routingKey, queueName);
        // 订阅消息
        subscribe(channel, declareOk, "消费者名称");
    }

    /**
     * 功能描述:
     * <消费者订阅消息>
     *
     * @param channel     1
     * @param declareOk   2
     * @param consumerTag 3
     * @return void
     * @author zhoulipu
     * @date 2019/7/31 17:51
     */
    public static void subscribe(Channel channel, AMQP.Queue.DeclareOk declareOk, String consumerTag) throws IOException {
        // 消费者订阅消息 监听队列 (队列名, 是否自动应答(与消息可靠有关 后续会介绍), 消费者标签, 消费者)
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("envelope:" + envelope.toString());
                System.out.println("properties:" + properties.toString());
                System.out.println("消息内容:" + new String(body));
            }
        };
        // 回调(队列名,是否自动确认消息,消费者)
        channel.basicConsume(declareOk.getQueue(), true, consumerTag, defaultConsumer);
    }

    /**
     * 功能描述:
     * <消费者配置>
     *
     * @param channel    1
     * @param pbxName    2
     * @param routingKey 3
     * @param queueName  4
     * @return com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @author zhoulipu
     * @date 2019/7/31 17:51
     */
    public static AMQP.Queue.DeclareOk consumerConfig(Channel channel, String pbxName, String routingKey, String queueName) throws IOException {
        // 消息过期配置(文末对参数有简述)
        Map<String, Object> arguments = new HashMap<String, Object>();
        // 声明队列 (队列名, 是否持久化, 是否排他, 是否自动删除, 队列中的消息自动删除配置);
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(queueName, true, false, false, arguments);
        // 将队列Binding到交换机上 (队列名, 交换机名, 路由关键字);
        channel.queueBind(declareOk.getQueue(), pbxName, routingKey);
        return declareOk;
    }
    
}

 

分享到:
评论

相关推荐

    rabbitmq发布订阅

    【标题】:“rabbitmq发布订阅”是分布式消息传递中的一个重要概念,它允许生产者发送消息到RabbitMQ服务器,而多个消费者可以订阅这些消息并进行处理。RabbitMQ是一个开源的消息代理和队列服务器,使用AMQP...

    RabbitMQ 发布者订阅者

    本文将深入探讨基于RabbitMQ实现的发布者订阅者(Publisher-Subscriber)模型,以及如何通过它来优化系统架构。 发布者订阅者模型是消息队列中的一种通信模式,其中发布者发送消息到一个主题,而多个订阅者可以监听...

    RabbitMQDemo_RabbitMQ发布与订阅Demo_rabbitmq_JSON_中间件_

    总结来说,"RabbitMQDemo_RabbitMQ发布与订阅Demo_rabbitmq_JSON_中间件_"是一个演示项目,其核心在于利用RabbitMQ作为中间件,通过发布者发送JSON格式的消息到队列,由订阅者从队列中接收并处理这些消息。...

    基于RabbitMQ的消息路由分发实例

    本实例将深入探讨基于RabbitMQ的消息路由分发功能,帮助你更好地理解和应用这一关键特性。 首先,理解RabbitMQ的基本概念是至关重要的。RabbitMQ是一个实现了Advanced Message Queuing Protocol(AMQP)的开源消息...

    (源码)基于RabbitMQ的消息队列系统.zip

    # 基于RabbitMQ的消息队列系统 ## 项目简介 本项目是一个基于RabbitMQ的消息队列系统,主要用于实现跨进程的通信机制,用于上下游传递信息。RabbitMQ作为消息中间件,主要作用是系统之间的信息传递进行解耦,并...

    rabbitMQ发布和订阅共7页.pdf.zip

    **RabbitMQ发布与订阅模式详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步任务处理、解耦和扩展性提升。在这个名为“rabbitMQ...

    rabbitMQ 消息队列 Demo

    这是RabbitMQ入门的经典示例,它展示了最基础的消息发布与消费过程。生产者发送一个简单的"Hello, World!"消息到RabbitMQ服务器,然后消费者从队列中取出并打印这个消息。这个例子帮助我们理解RabbitMQ的基本工作...

    RabbitMQ消息中间件技术精讲

    RabbitMQ支持多种消息传递模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式等,适用于各种复杂的应用场景。 #### 二、RabbitMQ的工作原理 RabbitMQ作为一款消息中间件,其核心组件主要包括...

    RabbitMQ消息中间件视频教程

    它支持多种消息发布订阅模式,包括简单模式(Simple)、工作队列模式(Work Queues)、发布/订阅模式(Publish/Subscribe)、路由模式(Routing)、RPC模式(Remote Procedure Call)、Fanout模式等。RabbitMQ具有高...

    RabbitMQ消息中间件技术精讲.txt

    RabbitMQ支持多种消息传递模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式等,适用于各种复杂的应用场景。 #### 二、RabbitMQ的基本概念 1. **Exchange(交换器)**:交换器用于接收生产者...

    C# RabbitMQ 主题订阅的源码

    主题订阅模式是RabbitMQ提供的一种发布/订阅模型,其中消息基于特定的路由键(topic)进行发送和接收。路由键类似于正则表达式,允许接收者根据需要过滤消息。 ### 4. 创建RabbitMQ连接 在C#中,我们首先需要创建一...

    Spring boot整合消息队列RabbitMQ实现四种消息模式(适合新手或者开发人员了解学习RabbitMQ机制)

    本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...

    RabbitMQClient_订阅测试.zip_rabbitmq

    在实际使用中,RabbitMQ的订阅模式是基于发布/订阅(Publish/Subscribe)模型,其中生产者发送消息到一个交换机,然后由交换机根据预定义的路由规则将消息分发到一个或多个队列。消费者订阅这些队列,当有新消息到达...

    20.消息中间件之RabbitMQ入门讲解

    RabbitMQ是一款开源的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地传递消息。在这个“20.消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心...

    经典安全消息队列工具rabbitMQ rabbitmq-server-3.7.9

    1. **事件通知**:在分布式系统中,事件通知是一种通信方式,它允许组件之间通过发布和订阅模式进行异步通信。RabbitMQ作为消息队列服务,可以作为事件驱动架构的核心,当一个应用产生事件时,它会将事件封装成消息...

    RabbitMQ与SpringMVC集成

    在提供的`spring.amqp.stocks.rar`文件中,可能包含了一个示例应用,展示了如何在SpringMVC项目中配置和使用RabbitMQ进行股票数据的发布和订阅。用户可以通过解压文件,查看源代码和文档(如`安装说明.docx`),了解...

    springboot与rabbitmq消息队列的整合

    在IT行业中,SpringBoot和RabbitMQ是两个非常重要的组件,它们在构建高效、可扩展的应用...在实际项目中,我们需要根据业务需求选择合适的消息模式,如简单模式、工作队列模式、发布/订阅模式等,以达到最佳的效果。

    基于springboot+maven的rabbitmq项目demo

    在RabbitMQ的发布/订阅模式下,生产者发布消息到一个交换机,交换机再根据预定义的路由规则将消息分发到一个或多个队列。消费者则订阅这些队列,接收到消息。这种模式适用于一对多的关系,比如广播通知或者日志记录...

    rabbitmq+vue+stomp.zip

    通过以上步骤,你可以构建一个基于RabbitMQ、Vue.js和STOMP的实时消息推送系统。这个系统具有良好的可扩展性和健壮性,可以适应各种实时通信需求。同时,Vue.js的组件化和RabbitMQ的灵活性为项目的维护和升级提供了...

Global site tag (gtag.js) - Google Analytics