`
Surlymo
  • 浏览: 97204 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq学习笔记

阅读更多

前一阵子写SEDA异步框架的时候,使用了rabbitmq作为分布式消息队列(见前文),把一些学习官网教程的笔记简单备忘。示例都来自官网

 

Part 2 Work Queues

 

1round-robin dispatchering  

缺陷:存在不能准确负载均衡的弊端

2fair dispatch  --> 针对管道

采用channel.basicQos(prefetchCount); 缺陷:可能带来队列塞

3message acknowledge  --> 针对消费者

boolean autoAck = false;channel.basicConsume("hello", autoAck, consumer);//默认是true,别用反了

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...     
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//
 记得手动加上发送ack。不然server会认为client没收到
}

4Message durability  --> 针对队列

采用boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);

5persistent --> 针对发布者

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

 

 

example:

 

 

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }     
  //...
}
 
public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());

          System.out.println(" [x] Received '" + message + "'");  
          doWork(message);
          System.out.println(" [x] Done" );

          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}
 

Part 3 Publish/Subscribe

 

1Exchanges(类似block)

typedirect, topic, headers, fanout(类似广播)

Declared way:  channel.exchangeDeclare("logs", "fanout");

 

2Temporary queue

Non-durable, exclusive, autodelete

String queueName = channel.queueDeclare().getQueue();

 

3Bindings

channel.queueBind(queueName, "logs", "");

 

example:

 

 

public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
                  throws java.io.IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String message = getMessage(argv);
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
    //...
}
 
 
 
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
 

 

Part 4 Routing

1Direct exchange

typedirect, topic, headers, fanout(类似广播)

Declared way:  channel.exchangeDeclare("logs", "fanout");

 

2Multiple bindings

 

example:

 

 

public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
                  throws java.io.IOException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String severity = getSeverity(argv);
        String message = getMessage(argv);
        channel.basicPublish(EXCHANGE_NAME, severity, null,    message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        channel.close();
        connection.close();
    }
    //..
}
 
 
 
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }
        for(String severity : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}
 

 

Part 5 Topic

1Topic exchange

direct exchange方式好的地方是有通配符的概念在里面

* (star) can substitute for exactly one word.

# (hash) can substitute for zero or more words.

 

Delimited by dots

 

Routing_key up to the limit of 255 bytes

 

example:

 

public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
                  throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String routingKey = getRouting(argv);
        String message = getMessage(argv);
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        connection.close();
    }
    //...
}
 
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv)
                  throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }
       for(String bindingKey : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}
 

 

集群方案:

一般采用镜像队列,内存节点作为主服务器+磁盘节点做冗余。

 

0
0
分享到:
评论
2 楼 Surlymo 2014-03-24  
jiaofuyou 写道
想问个问题,象这种任务分发的工作队列,你举的例子是一个队列被多个消费者处理,那有没有办法,多个队列的消息由一个消费者来处理呢?


应该是可以的。 channel.queueBind(queueName, EXCHANGE_NAME, severity);绑定一下就行
1 楼 jiaofuyou 2014-03-13  
想问个问题,象这种任务分发的工作队列,你举的例子是一个队列被多个消费者处理,那有没有办法,多个队列的消息由一个消费者来处理呢?

相关推荐

    rabbitMQ学习笔记

    rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...

    rabbitmq学习笔记和软件和插件

    **RabbitMQ学习笔记与软件插件详解** RabbitMQ是一种广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,提供可靠的消息传递服务。在分布式系统中,RabbitMQ扮演着数据交换中心的角色,...

    rabbitmq学习笔记.zip

    在这个“rabbitmq学习笔记.zip”压缩包中,我们可以期待找到一系列关于RabbitMQ的核心概念、安装教程、使用方法以及常见问题的详细说明。 首先,RabbitMQ的基本概念包括生产者(Producer)、消费者(Consumer)、...

    RabbitMq学习笔记

    RabbitMq是实现AMQP(高级消息队列协议)的消息中间件的一种,与Springboot整合后使用方便简单,功能强大。本文还列举了Springboot整合RabbitMq使用和原版对比、及一些坑,简单易懂。

    RabbitMQ学习 笔记

    ### RabbitMQ 学习笔记知识点总结 #### 一、RabbitMQ 历史与背景 RabbitMQ 是一个基于 AMQP(高级消息队列协议)的开源消息中间件,由 Erlang 语言编写而成。AMQP 的出现填补了异步消息处理领域的标准化空白,特别...

    RabbitMQ学习笔记

    **RabbitMQ学习笔记** RabbitMQ是一款广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,能够实现在分布式系统中可靠的消息传递。RabbitMQ的核心功能是作为一个消息代理,允许应用程序...

    mingyang66#spring-parent#RabbitMQ学习笔记:Connections、Channels、Queue

    RabbitMQ学习笔记:Connections、Channels、Queues之state状态连接、信道、队列状态如下:GitHub地址:https://gi

    rabbitMQ 学习笔记

    **正文** ...通过以上的学习,你应该对RabbitMQ有了基本的理解,并掌握了在Java中使用RabbitMQ的基本方法。然而,实际应用中,还需要根据具体业务场景灵活运用和调整策略,以充分发挥RabbitMQ的优势。

    RabbitMq学习笔记1

    在本篇学习笔记中,我们将首先了解RabbitMQ的安装过程。 1. **Erlang的安装**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编写的。安装Erlang可以通过以下命令完成: ```bash sudo apt-get install ...

    RabbitMQ.md

    RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记

    mingyang66#spring-parent#RabbitMQ学习笔记:队列容量设置(x-max-length、x-max-

    The default behaviour for RabbitMQ when a maximum queue length or size is set an

    RabbitMQ笔记.md

    结合狂神RabbitMQ笔记,外加自己查阅资料

    rabbitMQ入门笔记的mkdown笔记和源码

    rabbitMQ学习的笔记和学习过程中写的代码

    尚硅谷RabbitMQ pdf笔记(优质文档)

    【尚硅谷RabbitMQ pdf笔记】是一份详细阐述RabbitMQ技术的高质量学习资料,针对想要深入了解消息队列系统特别是RabbitMQ的开发者提供了一条清晰的学习路径。这份笔记不仅覆盖了RabbitMQ的基础概念,还深入探讨了其在...

    rabbitMQ配置笔记

    **RabbitMQ配置详解** RabbitMQ是一款基于AMQP(Advanced Message Queuing Protocol)...从安装、启动、管理到配置核心组件,都需要深入学习和实践。通过熟练掌握这些知识点,你将能够构建出健壮、高效的分布式系统。

    RabbitMQ学习讲义

    【RabbitMQ学习讲义】 RabbitMQ是一个开源的消息队列系统,它基于高级消息队列协议(AMQP)实现,旨在提供可靠的、跨平台的消息传递。AMQP是一种应用程序层的开放标准,允许不同的系统、语言和应用之间进行无缝的...

    RabbitMQ自学笔记

    综上所述,通过本篇笔记的学习,读者可以全面掌握RabbitMQ的基本使用方法以及一些高级特性,这对于实际项目中的消息处理需求提供了强有力的支持。无论是简单的消息传输还是复杂的分布式系统架构设计,RabbitMQ都能...

    RabbitMQ.xmind

    初步学习rabbitmq,对其下载配置,学习使用进行一个粗粒度的记载

    RabbitMQ笔记.zip

    **RabbitMQ基础** RabbitMQ是一个开源的消息代理和队列服务器,主要用于在分布式系统中进行消息传递。它实现了Advanced Message Queuing Protocol (AMQP),一...通过持续学习和实践,你可以更好地掌握这个强大的工具。

    rabbitmq笔记及面试要点

    【标题】:深入理解RabbitMQ及其面试关键点 【正文】: RabbitMQ作为一款广泛应用的消息中间件,它的核心价值在于实现消息队列的功能,有效地处理系统中的高并发和异步任务,从而提高系统的稳定性和响应速度。本文...

Global site tag (gtag.js) - Google Analytics