1.simple
producer --》 queue --》 consumer 生产后直接消费
2.work-queues
proudcer --》 queue --》 consumer 将消息发送到一个队列中,由队列的消费者依次消费
--》 consumer
--》 consumer
3.广播模式
producer --》 exchange --》 queue --》 consumer 生产者将消息发送到消息中转器,并有
--》 queue --》 consumer 消息中转器转到一个或多个队列,消费
4.direct模式
producer --》 exchange --》 queue --》 consumer 中转器中在绑定队列与订阅的主题进行
--》 queue --》 consumer 连接,将消息放到属于的队列中
5.topic
producer --》 exchange --》 queue --》 consumer 中转器中在绑定队列与订阅的主题进行
--》 queue --》 consumer 连接,将消息放到属于的队列中,正则
demo
package RabbitMQ;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
* Created by antong on 15/12/29.
*/
public class RabbitMQDemo {
private static final String QUEUE_NAME = "test";
private static final String EXCHANGE = "exchange_name";
private static final String[] SEVERITIES = {"black","blue","red"};
private static ConnectionFactory factory = new ConnectionFactory();
private RabbitMQDemo(){}
static {
factory.setHost("localhost");
}
public static Connection getConnection() throws IOException, TimeoutException {
return factory.newConnection();
}
public static void close(Channel channel,Connection connection) throws IOException, TimeoutException {
channel.close();
connection.close();
}
//发送队列 work-queues
public static void sendQueue() throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
//String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
for(int i = 0;i < 10;i++) {
//String exchange(“”), String routingKey(队列名称), BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
System.out.println(" [x] Sent '" + message + "'" + i);
}
close(channel, connection);
}
//接受队列 work-queues
public static void receiveQueue() throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
//String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
//广播模式
public static void sendExchange() throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
//String exchange, String type(队列模式)(fanout,direct,topic)
channel.exchangeDeclare(EXCHANGE, "fanout");
String message = "Hello World!";
for(int i = 0;i < 10;i++) {
//String exchange, String routingKey(“”), BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE, "", null, (message+i).getBytes());
System.out.println(" [x] Sent '" + message + "'" + i);
}
close(channel, connection);
}
//广播模式
public static void receiveExchange() throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, "fanout");
String queueName = channel.queueDeclare().getQueue();
//String queue, String exchange, String routingKey(“”)
channel.queueBind(queueName, EXCHANGE, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C"+queueName);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
//直连模式
public static void sendDirect() throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
//String exchange, String type(队列模式)(fanout,direct,topic)
channel.exchangeDeclare(EXCHANGE, "direct");
String message = "Hello World!";
for(int i = 0;i < 10;i++) {
//String exchange, String routingKey(主题), BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE, getSeverity(), null, (message+getSeverity()).getBytes());
System.out.println(" [x] Sent '" + message + "'" + i);
}
close(channel, connection);
}
//直连模式
public static void receiveDirect() throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, "direct");
String queueName = channel.queueDeclare().getQueue();
//String queue, String exchange, String routingKey(主题)
channel.queueBind(queueName, EXCHANGE, getSeverity());
System.out.println(" [*] Waiting for messages. To exit press CTRL+C"+queueName);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
private static String getSeverity()
{
Random random = new Random();
int ranVal = random.nextInt(3);
return SEVERITIES[ranVal];
}
}
分享到:
相关推荐
`rabbitmq-c`是RabbitMQ的一个C语言客户端库,它使得在C程序中与RabbitMQ服务器进行交互变得更加简单。本文将详细介绍如何使用CMake编译`rabbitmq-c-master`源码,并讨论相关知识点。 首先,我们需要了解CMake,这...
标题 "kettle rabbitmq 插件开发" 涉及的是如何在 Pentaho Kettle(也称为 Spoon)中创建和使用 RabbitMQ 插件。Kettle 是一个开源的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。RabbitMQ 是一个...
RabbitMQ服务器3.10.5是一款广泛使用的开源消息代理和队列服务器,它基于高级消息队列协议(AMQP)实现。这个版本的RabbitMQ提供了稳定且高效的中间件服务,允许分布式系统中的应用程序进行异步通信,确保数据可靠...
在这个"麒麟v10系统Rabbitmq3.6.10安装包"中,我们将探讨如何在麒麟v10环境下安装和配置RabbitMQ 3.6.10版本。 首先,安装RabbitMQ前需要确保系统满足必要的依赖条件。麒麟v10内核版本为4.19.90-17.ky10.x86_64,这...
RabbitMQ是一个开源的消息代理和队列服务器,广泛用于分布式系统中的消息传递。它基于AMQP(Advanced Message Queuing Protocol)标准,允许应用程序之间异步通信,并提供了高可用性、可扩展性和容错性。RabbitMQ的...
标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...
**RabbitMQ实战指南** RabbitMQ是一款广泛应用的开源消息队列系统,它基于Advanced Message Queuing Protocol(AMQP)标准,提供高可用性、可靠性和可扩展性。本实战指南将带你深入理解RabbitMQ的核心概念、安装与...
在本教程中,我们将深入探讨如何在 CentOS 7 操作系统上安装 RabbitMQ,这是一个流行的开源消息代理,基于AMQP(Advanced Message Queuing Protocol)协议。RabbitMQ 使用 Erlang 语言开发,它提供了一个可靠的平台...
RabbitMQ是一款开源的消息队列服务软件,它实现了高级消息队列协议(AMQP),以高性能、健壮和可伸缩性闻名,主要由Erlang语言编写。Erlang是一种适合于构建并发处理能力强、高可用性系统的编程语言,这些特点使得...
【标题】:“TP6使用RabbitMQ” 在PHP框架ThinkPHP6(简称TP6)中集成RabbitMQ是一项常见的任务,用于实现异步处理、消息队列和分布式系统的通信。RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced...
RabbitMQ是一个开源的消息中间件,它基于Advanced Message Queuing Protocol (AMQP)标准实现,用于在分布式系统中高效地传递消息。RabbitMQ的安装过程在Linux环境下需要依赖于Erlang,而Erlang自身又依赖于某些库,...
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
**RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...
为了能够有效地监控 RabbitMQ 的性能和状态,Prometheus 提供了一个名为 `rabbitmq_exporter` 的工具。然而,在某些情况下,官方网站可能不直接提供这个插件,这时我们需要从第三方源获取,例如在本例中提到的 `...
在Linux环境中,RabbitMQ是一种广泛使用的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议实现。本篇文章将详细讲解如何在Linux上安装RabbitMQ,包括必要的依赖软件Erlang和RabbitMQ服务器...
RabbitMQ是一个基于Erlang语言开发的消息中间件,它遵循AMQP(Advanced Message Queuing Protocol)协议,广泛用于分布式系统中的异步处理和解耦。在这个“RabbitMQ Java测试客户端”项目中,我们可以看到它包含了...
在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...
RabbitMQ安装程序Windows版 注意: 以管理员账号进行安装,否则可能会导致无法使用。 安装RabbitMQ之前需要提前安装相对应的Erlang依赖版本,推荐Erlang-25.3版本。 安装路径只能包含 ASCII 字符,强烈建议路径的...
【RabbitMQ性能测试报告】 本测试报告详细记录了对RabbitMQ的性能评估,包括在单机模式和集群模式下的压力和稳定性测试。RabbitMQ是业界广泛使用的开源消息代理,它基于AMQP(Advanced Message Queuing Protocol)...
在本资源中,我们有两个安装包:RabbitMQ 3.12.10和Erlang 26.0.2,这两个是RabbitMQ运行所必需的组件。 Erlang是一种函数式编程语言,以其并发能力、容错性和热代码升级功能而闻名。在RabbitMQ中,Erlang作为基础...