`
thrillerzw
  • 浏览: 145192 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq

 
阅读更多

基础:

工作队列主要是为了避免资源密集型任务的立即执行,然后一直等待它执行结束。相反,我们可以安排好任务,然后在执行。我们可以将一个任务封装成一个消息,发送到队列中。由工作者在后台取出任务然后执行。当有多个工作者时,他们共同处理这些任务。

 

demo:

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 消费者从消息队列取消息
 * @author thrillerzw
 *
 */
public class Reqv {
 private final static String QUEUE_NAME = "hello";

 public static void main(String[] argv) throws InterruptedException, IOException {

  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("192.168.0.200");
  // factory.setPort(15672);
  factory.setUsername("thrillerzw");
  factory.setPassword("123456");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  //parameters for queue 'hello4' in vhost '/' not equivalent
  boolean durable=true;
  channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  System.out.println("Waiting for messages.");
  QueueingConsumer consumer = new QueueingConsumer(channel);
  //事务方式: boolean autoAck :false,需要手工应答,mq才会认为消费成功。
  boolean autoAck=false;
  String consumerTag = channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  System.out.println("consumerTag=" + consumerTag);
        int i=0;
  while (true) {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
   try {
   //业务
   System.out.println("Received '" + message + "'");
   Thread.sleep(100);
   if(i==2){
     throw new Exception(); 
   }
   //autoAck :false时候应答确认消息处理完成
   long deliveryTag=delivery.getEnvelope().getDeliveryTag();
   channel.basicAck(deliveryTag, false);
   System.out.println();
   } catch (Throwable e) {
    e.printStackTrace();
    //测试看是回滚到队列的末尾.
    //不断回滚可能出现死循环?
    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
   }
   i++;
  }
 }
}

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 生产者向消息队列发送消息  
 * 防止宕机重启内存丢消息, 持久化队列。
 * channel.queueDeclare设置 boolean durable:true,channel.basicPublish消息也必须标记为MessageProperties.PERSISTENT_TEXT_PLAIN 
 * @author thrillerzw
 *
 */
public class Send {
 private final static String QUEUE_NAME = "hello";

 public static void main(String[] args) throws IOException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("192.168.0.200");
  factory.setUsername("thrillerzw");
  factory.setPassword("123456");

  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  //程序声明一个队列,如果没有会添加。持久化队列:durable:true
  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  for(int i=0;i<10;i++){
   String message = "Hello World!"+i;
   //持久化消息:MessageProperties.PERSISTENT_TEXT_PLAIN 非持久化: MessageProperties.TEXT_PLAIN
   channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
     message.getBytes());
   System.out.println("Sent '" + message + "'");
  }
  channel.close();
  connection.close();
 }
}

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author thrillerzw
 *
 */
public class SendLogs {
 private static final String EXCHANGE_NAME = "logs";

 public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("192.168.0.200");
  factory.setUsername("thrillerzw");
  factory.setPassword("123456");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明Exchange
  for (int i = 0; i <= 2; i++) {
   String message = "hello word!" + i;
   channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
   System.out.println("Sent '" + message + "'");
  }
  channel.close();
  connection.close();
 }

}

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {
 private static final String EXCHANGE_NAME = "logs";

 public static void main(String[] argv) throws Exception {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("192.168.0.200");
  factory.setUsername("thrillerzw");
  factory.setPassword("123456");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
        
  channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/*	 String queueName = "log-fb1";
  //通过channel.queueDeclare()来创建一个非持久的队列名
  channel.queueDeclare(queueName, false, false, false, null);*/
  //rabbitmq创建一个随机的auto-delete的队列,并返回名字。amq.gen-iieJpst2mQghdrXF7nKYCQ / amq.gen-iieJpst2mQghdrXF7nKYCQ
  String queueName = channel.queueDeclare().getQueue();
  channel.queueBind(queueName, EXCHANGE_NAME, "");//把Queue、Exchange绑定
  QueueingConsumer consumer = new QueueingConsumer(channel);
  channel.basicConsume(queueName, true, consumer);
  while (true) {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());
   System.out.println("Received '" + message + "'");
  }
 }
}

 

0
1
分享到:
评论

相关推荐

    rabbitmq-c-master.rar_RabbitMQ c lib_cmake编译_rabbitmq_rabbitmq-c

    `rabbitmq-c`是RabbitMQ的一个C语言客户端库,它使得在C程序中与RabbitMQ服务器进行交互变得更加简单。本文将详细介绍如何使用CMake编译`rabbitmq-c-master`源码,并讨论相关知识点。 首先,我们需要了解CMake,这...

    kettle rabbitmq 插件开发

    标题 "kettle rabbitmq 插件开发" 涉及的是如何在 Pentaho Kettle(也称为 Spoon)中创建和使用 RabbitMQ 插件。Kettle 是一个开源的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。RabbitMQ 是一个...

    rabbitmq-server3.10.5

    RabbitMQ服务器3.10.5是一款广泛使用的开源消息代理和队列服务器,它基于高级消息队列协议(AMQP)实现。这个版本的RabbitMQ提供了稳定且高效的中间件服务,允许分布式系统中的应用程序进行异步通信,确保数据可靠...

    linux中rabbitmq安装包

    在Linux环境中,RabbitMQ是一种广泛使用的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现。本篇文章将详细讲解如何在Linux上安装RabbitMQ,包括必要的依赖软件Erlang和RabbitMQ服务器...

    麒麟v10系统Rabbitmq3.6.10安装包

    在这个"麒麟v10系统Rabbitmq3.6.10安装包"中,我们将探讨如何在麒麟v10环境下安装和配置RabbitMQ 3.6.10版本。 首先,安装RabbitMQ前需要确保系统满足必要的依赖条件。麒麟v10内核版本为4.19.90-17.ky10.x86_64,这...

    rabbitmq 3.10.2 window安装包

    RabbitMQ是一个开源的消息代理和队列服务器,广泛用于分布式系统中的消息传递。它基于AMQP(Advanced Message Queuing Protocol)标准,允许应用程序之间异步通信,并提供了高可用性、可扩展性和容错性。RabbitMQ的...

    RabbitMQ实战指南-rabbitmq-action.zip

    **RabbitMQ实战指南** RabbitMQ是一款广泛应用的开源消息队列系统,它基于Advanced Message Queuing Protocol(AMQP)标准,提供高可用性、可靠性和可扩展性。本实战指南将带你深入理解RabbitMQ的核心概念、安装与...

    CentOS 7 下安装RabbitMQ教程配套的离线文件

    在本教程中,我们将深入探讨如何在 CentOS 7 操作系统上安装 RabbitMQ,这是一个流行的开源消息代理,基于AMQP(Advanced Message Queuing Protocol)协议。RabbitMQ 使用 Erlang 语言开发,它提供了一个可靠的平台...

    flink-sql集成rabbitmq

    标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...

    RabbitMQ使用指南.pdf

    RabbitMQ是一款开源的消息队列服务软件,它实现了高级消息队列协议(AMQP),以高性能、健壮和可伸缩性闻名,主要由Erlang语言编写。Erlang是一种适合于构建并发处理能力强、高可用性系统的编程语言,这些特点使得...

    RabbitMQ Linux安装教程

    RabbitMQ是一个开源的消息中间件,它基于Advanced Message Queuing Protocol (AMQP)标准实现,用于在分布式系统中高效地传递消息。RabbitMQ的安装过程在Linux环境下需要依赖于Erlang,而Erlang自身又依赖于某些库,...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    tp6使用rabbitmq

    【标题】:“TP6使用RabbitMQ” 在PHP框架ThinkPHP6(简称TP6)中集成RabbitMQ是一项常见的任务,用于实现异步处理、消息队列和分布式系统的通信。RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced...

    RabbitMQ-c源码

    **RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...

    rabbitmq-server-3.12.2.exe

    RabbitMQ安装程序Windows版 注意: 以管理员账号进行安装,否则可能会导致无法使用。 安装RabbitMQ之前需要提前安装相对应的Erlang依赖版本,推荐Erlang-25.3版本。 安装路径只能包含 ASCII 字符,强烈建议路径的...

    prometheus rabbitmq_exporter

    为了能够有效地监控 RabbitMQ 的性能和状态,Prometheus 提供了一个名为 `rabbitmq_exporter` 的工具。然而,在某些情况下,官方网站可能不直接提供这个插件,这时我们需要从第三方源获取,例如在本例中提到的 `...

    java rabbitmq动态注册,监听实现

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...

    RabbitMQ Java测试客户端

    RabbitMQ是一个基于Erlang语言开发的消息中间件,它遵循AMQP(Advanced Message Queuing Protocol)协议,广泛用于分布式系统中的异步处理和解耦。在这个“RabbitMQ Java测试客户端”项目中,我们可以看到它包含了...

    RabbitMQ性能测试报告

    【RabbitMQ性能测试报告】 本测试报告详细记录了对RabbitMQ的性能评估,包括在单机模式和集群模式下的压力和稳定性测试。RabbitMQ是业界广泛使用的开源消息代理,它基于AMQP(Advanced Message Queuing Protocol)...

    android上RabbitMQ发送简单例子

    在Android平台上,使用RabbitMQ进行网络通信是一个高效且可靠的选择。RabbitMQ是一个开源的消息代理和队列服务器,它允许应用程序之间通过消息传递进行异步通信。在本例中,我们将探讨如何在Android上设置和使用...

Global site tag (gtag.js) - Google Analytics