前一阵子写SEDA异步框架的时候,使用了rabbitmq作为分布式消息队列(见前文),把一些学习官网教程的笔记简单备忘。示例都来自官网
Part 2 Work Queues
1、round-robin dispatchering
缺陷:存在不能准确负载均衡的弊端
2、fair dispatch --> 针对管道
采用channel.basicQos(prefetchCount); 缺陷:可能带来队列塞
3、message acknowledge --> 针对消费者
boolean autoAck = false;channel.basicConsume("hello", autoAck, consumer);//默认是true,别用反了
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 记得手动加上发送ack。不然server会认为client没收到
}
4、Message durability --> 针对队列
采用boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);
5、persistent --> 针对发布者
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
example:
public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... }
Part 3 Publish/Subscribe
1、Exchanges(类似block)
type:direct, topic, headers, fanout(类似广播)
Declared way: channel.exchangeDeclare("logs", "fanout");
2、Temporary queue
Non-durable, exclusive, autodelete
String queueName = channel.queueDeclare().getQueue();
3、Bindings
channel.queueBind(queueName, "logs", "");
example:
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
Part 4 Routing
1、Direct exchange
type:direct, topic, headers, fanout(类似广播)
Declared way: channel.exchangeDeclare("logs", "fanout");
2、Multiple bindings
example:
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. } public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
1、Topic exchange
比direct exchange方式好的地方是有通配符的概念在里面
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
Delimited by dots
Routing_key up to the limit of 255 bytes
example:
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... } public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
集群方案:
一般采用镜像队列,内存节点作为主服务器+磁盘节点做冗余。
相关推荐
rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...
**RabbitMQ学习笔记与软件插件详解** RabbitMQ是一种广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,提供可靠的消息传递服务。在分布式系统中,RabbitMQ扮演着数据交换中心的角色,...
在这个“rabbitmq学习笔记.zip”压缩包中,我们可以期待找到一系列关于RabbitMQ的核心概念、安装教程、使用方法以及常见问题的详细说明。 首先,RabbitMQ的基本概念包括生产者(Producer)、消费者(Consumer)、...
RabbitMq是实现AMQP(高级消息队列协议)的消息中间件的一种,与Springboot整合后使用方便简单,功能强大。本文还列举了Springboot整合RabbitMq使用和原版对比、及一些坑,简单易懂。
### RabbitMQ 学习笔记知识点总结 #### 一、RabbitMQ 历史与背景 RabbitMQ 是一个基于 AMQP(高级消息队列协议)的开源消息中间件,由 Erlang 语言编写而成。AMQP 的出现填补了异步消息处理领域的标准化空白,特别...
**RabbitMQ学习笔记** RabbitMQ是一款广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,能够实现在分布式系统中可靠的消息传递。RabbitMQ的核心功能是作为一个消息代理,允许应用程序...
RabbitMQ学习笔记:Connections、Channels、Queues之state状态连接、信道、队列状态如下:GitHub地址:https://gi
**正文** ...通过以上的学习,你应该对RabbitMQ有了基本的理解,并掌握了在Java中使用RabbitMQ的基本方法。然而,实际应用中,还需要根据具体业务场景灵活运用和调整策略,以充分发挥RabbitMQ的优势。
在本篇学习笔记中,我们将首先了解RabbitMQ的安装过程。 1. **Erlang的安装**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编写的。安装Erlang可以通过以下命令完成: ```bash sudo apt-get install ...
RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记
The default behaviour for RabbitMQ when a maximum queue length or size is set an
结合狂神RabbitMQ笔记,外加自己查阅资料
rabbitMQ学习的笔记和学习过程中写的代码
【尚硅谷RabbitMQ pdf笔记】是一份详细阐述RabbitMQ技术的高质量学习资料,针对想要深入了解消息队列系统特别是RabbitMQ的开发者提供了一条清晰的学习路径。这份笔记不仅覆盖了RabbitMQ的基础概念,还深入探讨了其在...
**RabbitMQ配置详解** RabbitMQ是一款基于AMQP(Advanced Message Queuing Protocol)...从安装、启动、管理到配置核心组件,都需要深入学习和实践。通过熟练掌握这些知识点,你将能够构建出健壮、高效的分布式系统。
【RabbitMQ学习讲义】 RabbitMQ是一个开源的消息队列系统,它基于高级消息队列协议(AMQP)实现,旨在提供可靠的、跨平台的消息传递。AMQP是一种应用程序层的开放标准,允许不同的系统、语言和应用之间进行无缝的...
综上所述,通过本篇笔记的学习,读者可以全面掌握RabbitMQ的基本使用方法以及一些高级特性,这对于实际项目中的消息处理需求提供了强有力的支持。无论是简单的消息传输还是复杂的分布式系统架构设计,RabbitMQ都能...
初步学习rabbitmq,对其下载配置,学习使用进行一个粗粒度的记载
**RabbitMQ基础** RabbitMQ是一个开源的消息代理和队列服务器,主要用于在分布式系统中进行消息传递。它实现了Advanced Message Queuing Protocol (AMQP),一...通过持续学习和实践,你可以更好地掌握这个强大的工具。
【标题】:深入理解RabbitMQ及其面试关键点 【正文】: RabbitMQ作为一款广泛应用的消息中间件,它的核心价值在于实现消息队列的功能,有效地处理系统中的高并发和异步任务,从而提高系统的稳定性和响应速度。本文...