`

[rabbitmq] 安装与配置

阅读更多
RabbitMQ
-------------------------
1
下载

erlang
http://www.erlang.org/download/otp_win32_17.3.exe
http://www.erlang.org/download/otp_src_17.3.tar.gz
http://www.erlang.org/download/otp_doc_html_17.3.tar.gz
http://www.erlang.org/download/otp_doc_man_17.3.tar.gz

rabbitmq server
http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.2/rabbitmq-server-3.4.2.exe

rabbitmq java client
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.4.2/rabbitmq-java-client-bin-3.4.2.zip
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.4.2/rabbitmq-java-client-3.4.2.zip

2
配置

3
基本概念
http://www.rabbitmq.com/tutorials/tutorial-one-java.html

RabbitMQ 是一个消息代理,它接收生产者的消息,然后转发给消费者.同时,它路由,缓存,持久这个消息根据设定的规则.
producer 生产者,只负责发送消息.消息发送到指定队列中.
queue 队列或邮箱,它存储消息,它的大小没有限制,只要磁盘空间足够.消息只能缓存在队列中,队列属于消息代理的一部分.消息代理还包括 exchange 等.
consumer 消费者,它从队列接收消息.

AMQP:高级消息队列协议.

P(producer)   -- msg --> |Queue|Queue|Queue|  -- msg --> C(consumer)

PS:
生产者,消息代理,消费者通常不在同一机器上.

4 Hello World
----------------
我们这里使用 Java 客户端.

STEP 1:

    P(producer)   -- msg --> |HelloQueue|


public class P {

  private final static String QUEUE_NAME = "HelloQueue";

  public static void main(String[] args)
      throws Exception {
      // 建立连接
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("127.0.0.1"); 
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      // 声明队列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      
      // 创建消息
      String message = "Hello World!";
      
      // 发布消息到 HelloQueue
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
      
      // 关闭连接
      channel.close();
      connection.close();
  }

}



STEP 2

    |HelloQueue| -- msg --> C(consumer)

public class C {

  private final static String QUEUE_NAME = "HelloQueue";

  public static void main(String[] args)
      throws Exception {
    
    // 建立连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
    // 创建消费者,处理消息
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);

    while (true) {
        // block method ? yes.
        // 阻塞直到接收到下一条消息
        QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
        String message = new String(delivery.getBody());
        System.out.println("Received : " + message);
    }
}
  



5
Work Queue

    P -- MSG -- |Q| -- MSG -- Worker1|Worker2

Work1 , Work2 轮流获取消息处理,
为避免Worker1消息单节点处理失败, 修改ACK返回时机为消息处理成功后.ACK返回,消息代理才会删除已处理的消息,否则是Worker1一接收到就删除.

    boolean autoAck = false;
    channel.basicConsume("hello", autoAck, consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    //...      
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}



为保证消息代理节点失败,导致消息丢失, 设置消息持久化参数:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

...

 channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());


6
发布/订阅模式
一个消息多个接收者接收.
EXCHANGE: 消息交换,用于路由/过滤消息.

P -- MSG -- |X| -- |Q1| -- MSG -- C1
                -- |Q2| -- MSG -- C2


     // 交换策略扇出(fanout):消息交换到所有队列 
     channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

     String message = getMessage(argv);

     channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
     ...


     channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
     // 临时队列.随机命名的队列
     String queueName = channel.queueDeclare().getQueue();
     // 绑定交换与队列
     channel.queueBind(queueName, EXCHANGE_NAME, "");



7
路由
绑定: 一个队列对交换的消息感兴趣.
绑定KEY: 绑定时设置
路由KEY: 发布时设置 routingKey, fanout 会简单忽略这个值.
交换策略:
direct exchange: 直接交换,根据 routingKey 来选择发布到哪些队列.比fanout更加灵活.
fanout exchange: 扇出交换(全部队列都会接收这个消息)
topic exchange: routingKey用 "." 号分开.根据 routingKey 来选择发布到哪些匹配的队列, 支持模糊匹配.

     channel.exchangeDeclare(EXCHANGE_NAME, "topic");
     ...
     channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());



      channel.exchangeDeclare(EXCHANGE_NAME, "topic");
      String queueName = channel.queueDeclare().getQueue();



      for(String bindingKey : bindingKeys){
          channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
      }

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(queueName, true, consumer);

      while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());
          String routingKey = delivery.getEnvelope().getRoutingKey();
          ...
        }


8
RPC 远程过程调用
通常发送方要等待接收方的处理结果.

发送方阻塞直到方法返回结, 每个客户端一个响应队列.
为了识别哪个响应属于哪个请求,要加个请求ID.

result = Client.call(request); // block until result has returned

    Client -- id:RequestId , replyTo:ReplyQueue -- HelloQueue -- Server
           |                                                       |
           -- id:RequestId , replyTo:ReplyQueue -- ReplyQueue ------


RPCServer
private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}



RPCClient
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}

分享到:
评论

相关推荐

    rabbitMQ安装与配置(分布式配置)

    RabbitMQ 安装与配置(分布式配置) RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,由 Erlang 语言开发。它支持多种语言的客户端,包括 Java、Python、Ruby 等,可以满足高并发、可扩展性的业务需求。 安装...

    RabbitMQ安装配置.docx

    RabbitMQ安装与配置详细文档

    RabbitMQ安装配置手册

    NULL 博文链接:https://zhb1208.iteye.com/blog/1320219

    rabbitmq安装配置部署文档

    二、RabbitMQ配置和部署 1. 安装RabbitMQ 执行以下命令来安装RabbitMQ: ``` rpm -ivh rabbitmq-server-3.5.7-1.noarch.rpm ``` 如果安装成功,将显示安装完成的界面。 2. 启动服务 执行以下命令来启动RabbitMQ...

    Rabbitmq 默认配置文件模板

    在RabbitMQ的运行过程中,配置文件起着至关重要的作用,它们定义了服务器的行为、策略以及与其他服务的交互方式。本文将详细介绍RabbitMQ的默认配置文件模板`rabbitmq.config.example`和`advanced.config.example`。...

    RabbitMQ安装配置1

    RabbitMQ安装配置安装erlang下载地址:http://www.erlang.org/downloads yum install ncurses-dev

    rabbitMq安装教程以及软件

    - 安装过程中,系统会自动配置RabbitMQ服务,并启动服务。 3. **验证安装**: - 打开命令行窗口,输入`rabbitmqctl status`,如果显示RabbitMQ服务器的状态信息,说明安装成功。 - 通过浏览器访问`...

    rabbitmq3.8.16-linux-centos7.x

    4. **RabbitMQ安装与配置**:学习如何在CentOS系统上安装RPM包,配置RabbitMQ服务器,包括设置用户、虚拟主机、权限等。 5. **安全设置**:由于涉及到“安全”标签,所以理解如何配置SSL/TLS加密,设置访问控制,...

    rabbitmq 3.9.3 配置文件

    rabbitmq 3.9.3 配置文件

    RabbitMQ安装配置所需软件包

    在安装和配置RabbitMQ之前,我们需要确保准备好一些必要的软件包。以下是对这些软件包的详细介绍: 1. **Erlang**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编程语言编写的。Erlang是一种并发和容错能力强...

    rabbitmq配置文件 rabbitmq.config

    rabbitmq配置文件,用于rabbitmq管理

    RabbitMQ安装及配置详细文档以及程序

    按照文档,在Windows安装RabbitMQ,并配置用户和virtual Hosts 内含软件安装包和PDF

    rabbitMQ安装与使用.docx

    系统环境及所用版本与安装 在安装rabbitMQ之前,需要准备好系统环境。系统版本为CentOS 6.8,需要设置本地yum源。然后,需要安装erlang环境,因为rabbitMQ需要在erlang环境下安装与运行。最后一步是安装rabbitMQ ...

    Linux下安装RabbitMQ及相关环境配置-附件资源

    Linux下安装RabbitMQ及相关环境配置-附件资源

    RabbitMQ 安装包和开启MQTT功能

    总结来说,RabbitMQ 的安装与 MQTT 功能的开启涉及下载和安装 Erlang OTP 以及 RabbitMQ 服务器,启用 MQTT 插件,配置参数,最后验证 MQTT 连接。这是一项关键的操作,使得 RabbitMQ 能够支持物联网设备和其他 MQTT...

    精品资源-springboot-rabbitmq-master项目.zip

    RabbitMQ安装与配置 安装RabbitMQ需先安装erlang和socat 安装依赖环境 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel

    RabbitMQ_安装配置与管理1

    本文将详细介绍如何安装配置 RabbitMQ,以及管理其核心参数。 **一、RabbitMQ 安装** 1. **Erlang 安装**:RabbitMQ 基于 Erlang 虚拟机运行,因此首先需要安装 Erlang。通常,可以从 Erlang 官网下载源码包,解压...

    RabbitMQ实战带目录版本

    ### 二、RabbitMQ安装与配置 1. **平台支持**:RabbitMQ可在多种操作系统上运行,包括Linux、Windows和macOS。安装通常通过下载预编译的二进制包或使用包管理器进行。 2. **Erlang环境**:由于RabbitMQ是用Erlang...

    rabbitmq学习资料

    【RabbitMQ 学习资料】这篇文档是针对初学者准备的,主要涵盖了RabbitMQ的基本概念、消息队列的工作原理、AMQP和JMS的区别、常见消息中间件的对比,以及RabbitMQ的安装与配置,还有其提供的Web管理界面和基本功能。...

    总结Linux系统环境初始化、系统安全加固措施和系统内核优化

    19. rabbitmq 安装与配置 20. MAVEN 安装与配置 21. 命令行录屏软件 22. nodejs 安装与配置 23. 工具命令 24. elasticsearch 安装与配置 25. 进程管理 26. 文件系统管理 27. git 初始化本地仓库 28. Nacos 安装与...

Global site tag (gtag.js) - Google Analytics