pom.xml
了解springcloud架构可以加求求:三五三六二四七二五九
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
RabbitMQ 连接配置工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ChannelUtils {
public static Channel getChannelInstance(String connectionDescription) {
try {
ConnectionFactory connectionFactory = getConnectionFactory();
// 创建连接
Connection connection = connectionFactory.newConnection(connectionDescription);
return connection.createChannel();
} catch (Exception e) {
throw new RuntimeException("获取Channel连接失败");
}
}
/**
* 功能描述:
* <连接配置>
*
*
* @return com.rabbitmq.client.ConnectionFactory
* @author zhoulipu
* @date 2019/7/31 18:02
*/
private static ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
// 配置连接信息
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 失败重连模式:网络异常自动重新连接
connectionFactory.setAutomaticRecoveryEnabled(true);
// 失败重连模式:每10秒重试连接
connectionFactory.setNetworkRecoveryInterval(10000);
// 失败重连模式:重新声明交换器,队列等信息
connectionFactory.setTopologyRecoveryEnabled(true);
return connectionFactory;
}
}
生产者发布消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Scanner;
public class Producer {
/**
* 交换机名称(需与消费者保持一致)
*/
private static final String pbxName = "PBX";
/**
* 路由密匙(需与消费者保持一致)
*/
private static final String routingKey = "routingKey";
public static void main(String[] args) throws IOException {
// 获取连接
Channel channel = ChannelUtils.getChannelInstance("队列消息生产者");
// 生产者配置
AMQP.BasicProperties basicProperties = producerConfig(channel, pbxName);
while (true) {
System.out.print("请输入即将发布的消息:");
Scanner str = new Scanner(System.in);
// 发布消息
publish(channel, basicProperties, pbxName, routingKey, str.next());
System.out.println("消息已发布,请查看订阅者状态");
}
}
/**
* 功能描述:
* <生产者发布消息>
*
* @param channel 1
* @param basicProperties 2
* @param pbxName 3
* @param routingKey 4
* @param msg 5
* @return void
* @author zhoulipu
* @date 2019/7/31 17:52
*/
public static void publish(Channel channel, AMQP.BasicProperties basicProperties, String pbxName, String routingKey, String msg) throws IOException {
// 发布消息(交换机名, 路由关键字, 是否为强制性, 消息属性, 消息体);
channel.basicPublish(pbxName, routingKey, false, basicProperties, msg.getBytes());
}
/**
* 功能描述:
* <生产者配置>
*
* @param channel 1
* @param pbxName 2
* @return com.rabbitmq.client.AMQP.BasicProperties
* @author zhoulipu
* @date 2019/7/31 17:52
*/
public static AMQP.BasicProperties producerConfig(Channel channel, String pbxName) throws IOException {
// 声明交换机 (交换机名称, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机(客户机直接发送消息), 队列中的消息自动删除配置);
channel.exchangeDeclare(pbxName, BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
// 消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.deliveryMode(2) // 设置消息是否持久化,1: 非持久化 2:持久化
.contentType("UTF-8")
.contentEncoding("UTF-8")
.headers(new HashMap<>())
.build();
return basicProperties;
}
}
消费者订阅消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Consumer {
/**
* 交换机名称(需与消费者保持一致)
*/
private static final String pbxName = "PBX";
/**
* 路由密匙(需与消费者保持一致)
*/
private static final String routingKey = "routingKey";
public static void main(String[] args) throws IOException {
// 队列名称(多个消费者需使用不同队列名称)
String queueName = "queueName";
// 获取连接
Channel channel = ChannelUtils.getChannelInstance("队列消息消费者");
// 消费者配置(交换机、路由、队列)
AMQP.Queue.DeclareOk declareOk = consumerConfig(channel, pbxName, routingKey, queueName);
// 订阅消息
subscribe(channel, declareOk, "消费者名称");
}
/**
* 功能描述:
* <消费者订阅消息>
*
* @param channel 1
* @param declareOk 2
* @param consumerTag 3
* @return void
* @author zhoulipu
* @date 2019/7/31 17:51
*/
public static void subscribe(Channel channel, AMQP.Queue.DeclareOk declareOk, String consumerTag) throws IOException {
// 消费者订阅消息 监听队列 (队列名, 是否自动应答(与消息可靠有关 后续会介绍), 消费者标签, 消费者)
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope.toString());
System.out.println("properties:" + properties.toString());
System.out.println("消息内容:" + new String(body));
}
};
// 回调(队列名,是否自动确认消息,消费者)
channel.basicConsume(declareOk.getQueue(), true, consumerTag, defaultConsumer);
}
/**
* 功能描述:
* <消费者配置>
*
* @param channel 1
* @param pbxName 2
* @param routingKey 3
* @param queueName 4
* @return com.rabbitmq.client.AMQP.Queue.DeclareOk
* @author zhoulipu
* @date 2019/7/31 17:51
*/
public static AMQP.Queue.DeclareOk consumerConfig(Channel channel, String pbxName, String routingKey, String queueName) throws IOException {
// 消息过期配置(文末对参数有简述)
Map<String, Object> arguments = new HashMap<String, Object>();
// 声明队列 (队列名, 是否持久化, 是否排他, 是否自动删除, 队列中的消息自动删除配置);
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(queueName, true, false, false, arguments);
// 将队列Binding到交换机上 (队列名, 交换机名, 路由关键字);
channel.queueBind(declareOk.getQueue(), pbxName, routingKey);
return declareOk;
}
}
相关推荐
【标题】:“rabbitmq发布订阅”是分布式消息传递中的一个重要概念,它允许生产者发送消息到RabbitMQ服务器,而多个消费者可以订阅这些消息并进行处理。RabbitMQ是一个开源的消息代理和队列服务器,使用AMQP...
本文将深入探讨基于RabbitMQ实现的发布者订阅者(Publisher-Subscriber)模型,以及如何通过它来优化系统架构。 发布者订阅者模型是消息队列中的一种通信模式,其中发布者发送消息到一个主题,而多个订阅者可以监听...
总结来说,"RabbitMQDemo_RabbitMQ发布与订阅Demo_rabbitmq_JSON_中间件_"是一个演示项目,其核心在于利用RabbitMQ作为中间件,通过发布者发送JSON格式的消息到队列,由订阅者从队列中接收并处理这些消息。...
本实例将深入探讨基于RabbitMQ的消息路由分发功能,帮助你更好地理解和应用这一关键特性。 首先,理解RabbitMQ的基本概念是至关重要的。RabbitMQ是一个实现了Advanced Message Queuing Protocol(AMQP)的开源消息...
**RabbitMQ发布与订阅模式详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步任务处理、解耦和扩展性提升。在这个名为“rabbitMQ...
这是RabbitMQ入门的经典示例,它展示了最基础的消息发布与消费过程。生产者发送一个简单的"Hello, World!"消息到RabbitMQ服务器,然后消费者从队列中取出并打印这个消息。这个例子帮助我们理解RabbitMQ的基本工作...
RabbitMQ支持多种消息传递模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式等,适用于各种复杂的应用场景。 #### 二、RabbitMQ的工作原理 RabbitMQ作为一款消息中间件,其核心组件主要包括...
它支持多种消息发布订阅模式,包括简单模式(Simple)、工作队列模式(Work Queues)、发布/订阅模式(Publish/Subscribe)、路由模式(Routing)、RPC模式(Remote Procedure Call)、Fanout模式等。RabbitMQ具有高...
RabbitMQ支持多种消息传递模式,包括简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式等,适用于各种复杂的应用场景。 #### 二、RabbitMQ的基本概念 1. **Exchange(交换器)**:交换器用于接收生产者...
主题订阅模式是RabbitMQ提供的一种发布/订阅模型,其中消息基于特定的路由键(topic)进行发送和接收。路由键类似于正则表达式,允许接收者根据需要过滤消息。 ### 4. 创建RabbitMQ连接 在C#中,我们首先需要创建一...
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
在实际使用中,RabbitMQ的订阅模式是基于发布/订阅(Publish/Subscribe)模型,其中生产者发送消息到一个交换机,然后由交换机根据预定义的路由规则将消息分发到一个或多个队列。消费者订阅这些队列,当有新消息到达...
RabbitMQ是一款开源的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地传递消息。在这个“20.消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心...
1. **事件通知**:在分布式系统中,事件通知是一种通信方式,它允许组件之间通过发布和订阅模式进行异步通信。RabbitMQ作为消息队列服务,可以作为事件驱动架构的核心,当一个应用产生事件时,它会将事件封装成消息...
在提供的`spring.amqp.stocks.rar`文件中,可能包含了一个示例应用,展示了如何在SpringMVC项目中配置和使用RabbitMQ进行股票数据的发布和订阅。用户可以通过解压文件,查看源代码和文档(如`安装说明.docx`),了解...
在IT行业中,SpringBoot和RabbitMQ是两个非常重要的组件,它们在构建高效、可扩展的应用...在实际项目中,我们需要根据业务需求选择合适的消息模式,如简单模式、工作队列模式、发布/订阅模式等,以达到最佳的效果。
在RabbitMQ的发布/订阅模式下,生产者发布消息到一个交换机,交换机再根据预定义的路由规则将消息分发到一个或多个队列。消费者则订阅这些队列,接收到消息。这种模式适用于一对多的关系,比如广播通知或者日志记录...
通过以上步骤,你可以构建一个基于RabbitMQ、Vue.js和STOMP的实时消息推送系统。这个系统具有良好的可扩展性和健壮性,可以适应各种实时通信需求。同时,Vue.js的组件化和RabbitMQ的灵活性为项目的维护和升级提供了...
在RabbitMQ中,交换机负责将消息路由到与之绑定的队列,队列再连接到订阅的消费者。 四、路由(Direct Exchange) 直接交换模式根据路由键(routing key)将消息路由到匹配的队列。生产者和消费者都指定一个路由键...