- 浏览: 2539632 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
RabbitMQ(5)Java Client - Publish/Subscribe
We will deliver the messages to multiple consumers. This pattern is known as "publish/subscribe".
Exchanges
The Producer will not directly contact to the queues. We will have a exchange between them. And the exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
There are few exchange types available: direct, topic, headers and fanout.
channel.exchangeDeclare("logs", "fanout");
Fanout just broadcasts all the messages it receives to all the queues it knows.
We can use this command to see all the exchanges running on our server
>sudo sbin/rabbitmqctl list_exchanges
Nameless exchange
Before, we send our message using a default exchange, which we identify by the empty string("").
channel.basicPublish("", "hello", null, message.getBytes());
Temporary queues
Before we have "hello" and "task_queue". Being able to name a queue was crucial for us. Giving a queue a name is important when you want to share the queue between producers and consumers.
In the Java client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:
String queueName = channel.queueDeclare().getQueue();
Bindings
The relationship between exchange and a queue is called a binding.
channel.queueBind(queueName, "logs", "");
>sudo sbin/rabbitmqctl list_bindings
Putting it all together
Emit the message
package com.sillycat.easytalker.rabbitmq.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "error debugy inform warning log!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
Receiving the log messages
package com.sillycat.easytalker.rabbitmq.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
Queue belongs to the consumer, binding with the exchange.
references:
http://www.rabbitmq.com/tutorials/tutorial-three-java.html
We will deliver the messages to multiple consumers. This pattern is known as "publish/subscribe".
Exchanges
The Producer will not directly contact to the queues. We will have a exchange between them. And the exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
There are few exchange types available: direct, topic, headers and fanout.
channel.exchangeDeclare("logs", "fanout");
Fanout just broadcasts all the messages it receives to all the queues it knows.
We can use this command to see all the exchanges running on our server
>sudo sbin/rabbitmqctl list_exchanges
Nameless exchange
Before, we send our message using a default exchange, which we identify by the empty string("").
channel.basicPublish("", "hello", null, message.getBytes());
Temporary queues
Before we have "hello" and "task_queue". Being able to name a queue was crucial for us. Giving a queue a name is important when you want to share the queue between producers and consumers.
In the Java client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:
String queueName = channel.queueDeclare().getQueue();
Bindings
The relationship between exchange and a queue is called a binding.
channel.queueBind(queueName, "logs", "");
>sudo sbin/rabbitmqctl list_bindings
Putting it all together
Emit the message
package com.sillycat.easytalker.rabbitmq.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "error debugy inform warning log!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
Receiving the log messages
package com.sillycat.easytalker.rabbitmq.publish;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
Queue belongs to the consumer, binding with the exchange.
references:
http://www.rabbitmq.com/tutorials/tutorial-three-java.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 288Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 437Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 279Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 243Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 313AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 301Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 333Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 442Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 493Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 363Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 321Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 365Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 432Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 507MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 456RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 315Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 316Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 320ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 389Jetty Server and Cookie Domain ...
相关推荐
5. **RabbitMQ工作模式**:RabbitMQ支持发布/订阅(Publish/Subscribe)、点对点(Point-to-Point)、路由(Routing)、主题(Topic)等多种消息模式,满足不同应用场景的需求。 6. **Java客户端使用**:使用...
在`rabbitmq-java-client-2.7.0`这个版本中,你可以找到RabbitMQ Java客户端的API文档和库文件,帮助开发者更好地理解和使用这个客户端。这个版本可能已经较旧,但其基本概念和使用方式仍然适用于最新的RabbitMQ ...
文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...
RabbitMQ 支持多种消息发布模式,包括简单模式(Simple)、工作队列模式(Work Queues)、发布/订阅模式(Publish/Subscribe)、路由模式(Routing)、主题模式(Topics)等。 #### 二、安装RabbitMQ ##### 2.1 ...
client.publish(exchange, routingKey, message.getBytes()); ``` 4. **消费消息**:设置回调函数以接收并处理消息。 ```java client.getQueueConsumer(queueName).subscribe(message -> { System.out.println(...
- **Publish/Subscribe (Pub/Sub)**: 发布订阅模式,生产者发布消息,多个消费者订阅。 #### 五、RabbitMQ安装与配置 - **安装文档**: [http://www.rabbitmq.com/getstarted.html]...
在实际使用中,RabbitMQ的订阅模式是基于发布/订阅(Publish/Subscribe)模型,其中生产者发送消息到一个交换机,然后由交换机根据预定义的路由规则将消息分发到一个或多个队列。消费者订阅这些队列,当有新消息到达...
1. **Client Libraries**:项目可能包含了多种编程语言(如Java, Python, .NET等)的客户端库示例,用于连接RabbitMQ broker,发布和接收消息。 2. **Basic Publish/Consume**:基础的发布/消费示例,展示如何创建一...
- Publish/Subscribe:发布/订阅模式。 - Routing:路由模式。 - Topics:主题模式。 - Headers:头部模式。 **2.3 导入Demo工程** - **结构**: - `mq-demo`:父工程,管理依赖。 - `publisher`:消息发送...
在 "rabbitmqdemo" 中的基础版示例中,你可以看到如何使用 RabbitMQ 的 Java 客户端库(rabbitmq-client)创建生产者和消费者。生产者会创建消息并发送到指定的交换器,消费者则订阅感兴趣的队列,等待接收消息。...
3. **Publish/Subscribe(发布/订阅)**:生产者发布消息到主题(Topic),消费者订阅主题并接收消息。这种模式允许一对多的通信。 4. **Routing(路由选择)**:通过路由键将消息路由到特定的队列,支持简单的匹配...
RabbitMQ遵循发布/订阅(Publish/Subscribe)、工作队列(Work Queues)、直接交换(Direct Exchange)、主题交换(Topic Exchange)和头部交换(Header Exchange)等多种消息交换模式。C++客户端库为这些模式提供了相应的API...
1. **发布/订阅模式(Publish/Subscribe)** 在这种模式下,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅该主题,接收到所有发布到该主题的消息。这种模式适用于一对多的...
**RabbitMQ官方七个例子详解** ...在Java项目中,可以利用RabbitMQ的Java客户端库`rabbitmq-client`来实现这些功能。为了运行这些示例,你需要下载并安装RabbitMQ服务器,然后运行提供的Java代码,以便观察其工作原理。
在"JMS sub/pub实现聊天系统"中,我们主要探讨的是如何利用JMS的发布/订阅(Publish/Subscribe)模型来构建一个聊天系统。 在JMS中,有两种消息传递模型:点对点(Point-to-Point)和发布/订阅(Publish/Subscribe...
对于Java开发者,这些示例使用了RabbitMQ的Java客户端库,包括`com.rabbitmq.client`包下的API。 理解并实践这些示例有助于深入掌握RabbitMQ的用法,无论是在微服务架构中作为服务间通信的桥梁,还是在大型应用中...
- 使用`MqttClient`的`publish()`方法将消息发送到指定的主题。 3. **订阅主题**: - 使用`subscribe()`方法订阅感兴趣的主题,可以同时订阅多个主题。 - 参数可以包含主题和对应的QoS级别,用于决定接收到消息...
在这个名为“RabbitMQTrial.zip”的压缩包中,包含了两个示例项目,分别展示了RabbitMQ的工作者模式(Work Queue)和发布订阅模式(Publish/Subscribe),以及一个解决方案文件(RabbitMQTrial.sln),便于开发者...
- **发布/订阅模式**(Publish/Subscribe, Pub/Sub): - 特点:发布者将消息发送到主题,所有订阅该主题的消费者都能接收到消息。 - 场景示例:新闻更新通知,多个用户订阅同一新闻频道。 **常见消息队列产品** ...
- 对于不同MQ服务,通常都有官方或社区提供的API和客户端库,如RabbitMQ的amqp-client,Kafka的Java和Python客户端等。 5. **消息获取方式**: - Pull模式:消费者主动从队列中拉取消息,适用于消费者有固定调度...