一、Producer端
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "hello_exchange";
String routingKey = "hello_routingKey";
channel.exchangeDeclare(exchangeName, "direct", true); //定义exchange
channel.queueBind(QUEUE_NAME, exchangeName, routingKey); //通过routingKey把queue和exchange绑定
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); //发布消息
流控
RabbitMQ 服务端有自我保护机制——流控,流控分两种:
1. 全局流控
当内存/磁盘使用量达到流控阀值,会触发消息发送全局流控。发生流控后该实例所有队列都无法投递消息只能消费消息;
2. 队列流控
队列由于生产速率远远超过消费速率,同时造成该队列消息堆积严重会发生队列级别流控。发生流控后当前队列将无法投递消息,只能消费消息。
当堆积的消息数量下降后,队列流控状态会消失,消息投递将恢复正常。
所以用户当收到流控报警后要及时查看实例消费者消费能力,通过增加消费者,或者清理可丢弃消息数据等措施尽快减少消息堆积,同时应用需要对因为流控消息投递失败的情况进行处理,消息先放在其他介质或者等待重试等机制避免消息因为投递失败后消息丢失。
二、Consumer端
- Push模式:客户端被动收数据,Broker会把message轮流发给不同的Consumer,客户端接受消息不可控。即使没有ack,也会继续轮流发送。客户端默认1000大小的队列VariableLinkedBlockingQueue收取消息,然后Runtime.getRuntime().availableProcessors() * 2大小的固定大小ThreadPoolExecutor消费消息。
- Pull模式:客户端主动收数据,客户端接受消息可控。无论有没有ack,还是会继续收到message。
- 相同点:都是长链接
Push模式
即Consumer订阅queue,当有message发布到queue时,会马上push到Consumer。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [y] Received '" + message + "'");
this.getChannel().basicAck(envelope.getDeliveryTag(), false); //手动ack
}
};
channel.basicConsume(QUEUE_NAME, false, consumer); //第二个参数设置是否自动ack
Pull模式
即Consumer主动去queue拉取message。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
while (true) {
GetResponse response = channel.basicGet(QUEUE_NAME, false); //第二个参数设置是否自动ack
if (response == null) {
// No message retrieved.
} else {
byte[] body = response.getBody();
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false); // acknowledge receipt of the message
}
}
分享到:
相关推荐
7. **消费者(Consumer)与生产者(Producer)** 生产者负责将消息发布到交换机,消费者则从队列中接收消息。源码中会揭示它们如何建立连接、发送和接收消息,以及如何处理消费确认机制。 8. **客户端API** 提供的`...
在RabbitMQ中,发送端(Producer)负责生产消息并将其发布到交换机,而接收端(Consumer)则从队列中获取并消费这些消息。这两个文件很可能是Java程序,分别展示了如何使用RabbitMQ Java客户端库来实现消息的发送和...
标题 "Producer_rabbitmq_" 暗示了这个项目是关于使用 .NET Core 平台与 RabbitMQ 进行交互的生产者应用。RabbitMQ 是一个流行的消息队列系统,用于实现分布式系统中的异步任务处理和解耦组件。 描述 ".NET Core ...
4. **生产者(Producer)**:生产者是发送消息到RabbitMQ的应用。 5. **消费者(Consumer)**:消费者是从RabbitMQ接收并处理消息的应用。 **二、Erlang与RabbitMQ** Erlang是一种并发性极强的编程语言,RabbitMQ...
- **消费者(Consumer)**:从RabbitMQ队列中接收并处理消息的应用。 **进一步操作:** - **配置RabbitMQ**:可以通过修改`rabbitmq.config`文件来调整服务器配置,如设置内存限制、日志级别等。 - **管理插件**:...
本文档为 RabbitMq 使用手册,介绍了 RabbitMq 的应用场景和开发指导。RabbitMq 是一个由 Erlang 开发的 AMQP(Advanced Message Queue)流行的开源消息队列系统。RabbitMq 的结构图如下: RabbitMq 几个概念说明:...
* Producer:生产者,负责发送消息到 RabbitMQ 服务器。 * Consumer:消费者,负责从 RabbitMQ 服务器接收消息。 * Queue:队列,存储生产者发送的消息。 * Exchange:交换机,负责将生产者的消息路由到相应的队列。...
RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,允许应用程序之间进行异步通信。RabbitMQ在分布式系统、微服务架构以及需要高可用性和可扩展性的环境中...
RabbitMQ服务器3.10.5是一款广泛使用的开源消息代理和队列服务器,它基于高级消息队列协议(AMQP)实现。这个版本的RabbitMQ提供了稳定且高效的中间件服务,允许分布式系统中的应用程序进行异步通信,确保数据可靠...
3. **内存管理**: 监控和调整RabbitMQ的内存使用,避免因内存过高导致的性能下降或服务中断。合理设置`vm_memory_high_watermark`配置项可以预防这种情况。 4. **持久化策略**: 选择适当的持久化策略平衡性能和数据...
2. **Consumer**: 创建连接和通道,使用`basicConsume`方法订阅队列,监听消息。 项目中的配置文件(如`application.properties`或`application.yml`)可能会包含RabbitMQ服务器的URL、用户名、密码等信息,供Java...
**RabbitMQ与Erlang开发应用** 在IT行业中,消息队列系统是分布式系统间进行异步通信的重要工具,而RabbitMQ作为一款...安装和配置RabbitMQ需要对这两个方面有基本了解,而实际使用则需要深入掌握其工作原理和API。
1. 引言 在软件开发中,消息队列(Message Queue,MQ)扮演着重要的角色,它通过引入异步处理机制,提升了系统的响应速度和整体吞吐量。...正确配置和使用RabbitMQ,能够有效优化系统架构,提高系统的稳定性和效率。
了解并熟练掌握这些基本概念和操作,能够帮助你有效地在.NET环境中利用RabbitMQ实现可靠的消息传递和异步任务处理。在实际项目中,还可以根据需求配置高级特性,如死信队列、延迟队列、优先级队列等,以满足不同场景...
本示例提供的"(最新)rabbitmq生产者和消费者demo,主题交换机"是关于如何在RabbitMQ中设置和使用主题交换机(Topic Exchange)的实例,它涵盖了生产者(Producer)和消费者(Consumer)的实现。 首先,让我们详细...
RabbitMQ 和消息传递系统通常会使用特定的术语来描述其中的关键组件。 - **生产者(Producer)**:生产者是指发送消息的程序或应用。在RabbitMQ 的图形表示中,我们通常用 "P" 来标记它。 - **队列(Queue)**:队列是...
在“springboot+rabbitmq源码”项目中,我们可以看到两个关键部分:mq_producer和mq_consumer,这分别代表了生产者和消费者。在消息队列系统中,生产者是发送消息的应用,而消费者则是接收和处理这些消息的应用。 ...
在RabbitMQ中,队列(Queue)是存储消息的地方,生产者(Producer)发送消息到队列,消费者(Consumer)从队列中接收并处理消息。绑定(Binding)则是定义消息从交换机(Exchange)到队列的路由规则。 在Java中,...
《深入理解RabbitMQ 3.7.14:基于rabbitmq-server_3.7.14.exe的安装与应用》 RabbitMQ是全球最流行的...无论是初学者还是经验丰富的开发者,都应该掌握其基本使用和配置,以便在实际项目中发挥出RabbitMQ的最大价值。
- **生产者(Producer)**: 生产者是创建和发送消息到RabbitMQ的组件。 - **消费者(Consumer)**: 消费者是从RabbitMQ接收并处理消息的应用程序部分。 - **通道(Channel)**: 通道是RabbitMQ通信的轻量级机制,...