- 浏览: 2539782 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
RabbitMQ(6)Java Client - Routing
Bindings
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
channel.queueBind(queueName, EXCHANGE_NAME, "black");
Creating a binding with binding_key.
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
Direct exchange
We want to extend the log system, and allow filtering messages based on their severity. For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
We will use direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
Multiple bindings
It is perfectly legal to bind multiple queues with the same binding key. If all the queues bind with the same binding key and the message is sent with this binding key. It will work some like fanout.
Emitting logs
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
And we will send message with binding key as serverity
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
To simplify things we will assume that 'serverity' can be one of 'info', 'warning', 'error'.
Subscribing
We are going to create a new binding for each severity we're interested in.
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
All together
EmitLogDrect.java it will send the message with binding key via direct exchange
package com.sillycat.easytalker.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "info", null, "hello info".getBytes());
channel.basicPublish(EXCHANGE_NAME, "error", null, "hello error".getBytes());
channel.basicPublish(EXCHANGE_NAME, "warning", null, "hello warning".getBytes());
channel.basicPublish(EXCHANGE_NAME, "debug", null, "hello debug".getBytes());
channel.close();
connection.close();
}
}
ReceiveLogsDirect.java it will receive only the error and warning messages.
package com.sillycat.easytalker.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message
+ "'");
}
}
}
references:
http://www.rabbitmq.com/tutorials/tutorial-four-java.html
Bindings
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
channel.queueBind(queueName, EXCHANGE_NAME, "black");
Creating a binding with binding_key.
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
Direct exchange
We want to extend the log system, and allow filtering messages based on their severity. For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
We will use direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
Multiple bindings
It is perfectly legal to bind multiple queues with the same binding key. If all the queues bind with the same binding key and the message is sent with this binding key. It will work some like fanout.
Emitting logs
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
And we will send message with binding key as serverity
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
To simplify things we will assume that 'serverity' can be one of 'info', 'warning', 'error'.
Subscribing
We are going to create a new binding for each severity we're interested in.
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
All together
EmitLogDrect.java it will send the message with binding key via direct exchange
package com.sillycat.easytalker.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "info", null, "hello info".getBytes());
channel.basicPublish(EXCHANGE_NAME, "error", null, "hello error".getBytes());
channel.basicPublish(EXCHANGE_NAME, "warning", null, "hello warning".getBytes());
channel.basicPublish(EXCHANGE_NAME, "debug", null, "hello debug".getBytes());
channel.close();
connection.close();
}
}
ReceiveLogsDirect.java it will receive only the error and warning messages.
package com.sillycat.easytalker.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message
+ "'");
}
}
}
references:
http://www.rabbitmq.com/tutorials/tutorial-four-java.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 288Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 437Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 279Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 243Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 313AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 301Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 333Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 442Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 493Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 363Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 321Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 365Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 432Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 507MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 456RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 315Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 316Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 320ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 389Jetty Server and Cookie Domain ...
相关推荐
在"rabbitmq-java-client-bin-3.3.4.zip"这个压缩包中,包含的是RabbitMQ的Java客户端库,这是与RabbitMQ服务器通信的一个关键组件。RabbitMQ提供了多种语言的客户端,Java客户端则是针对Java开发者设计的,使得Java...
在“rabbitmq-java-client-bin-3.3.4”这个压缩包中,包含了该版本的Java客户端库及相关文档,为Java应用提供了可靠的异步通信支持。 首先,RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源...
6. **Java客户端使用**:使用"rabbitmq-java-client"时,开发者需要创建ConnectionFactory,建立与RabbitMQ服务器的连接,然后创建Channel并声明Exchange和Queue,最后绑定Queue到Exchange,从而完成消息的发布和...
在Java项目中,我们可以使用`amqp-client`库来操作RabbitMQ。首先,需要添加依赖: ```xml <groupId>com.rabbitmq</groupId> <artifactId>amqp-client <version>5.x.x ``` 然后,建立连接、创建通道、声明...
在Java客户端中,`com.rabbitmq.client`包提供了主要的API类和接口。`ConnectionFactory`是创建连接到RabbitMQ服务器的工厂类,它允许配置诸如主机名、端口、用户认证等参数。`Connection`对象代表了与RabbitMQ...
在Java中,我们通常使用RabbitMQ的官方客户端库`com.rabbitmq:amqp-client`来与RabbitMQ服务器进行通信。这个库提供了`ConnectionFactory`、`Channel`等类,用于建立连接、创建通道并执行各种操作。 要实现动态创建...
Java开发者可以通过`amqp-client`库轻松地与RabbitMQ集成,实现高效的数据交换。在这个简单的demo中,我们学习了如何创建连接、声明交换机和队列、发送和接收消息,这些都是RabbitMQ的基础操作,为更高级的用法打下...
pdi-rabbitmq-client 水壶Java类与RabbitMQ进行交互 有几个Java类可以与RabbitMQ进行交互,但是它们对我没有用... 现在,我发表我的方法。 有两套... publish.java:发布消息的Java类 publish.ktr:一个示例。 ...
在Java开发中,我们通常使用RabbitMQ的Java客户端库`com.rabbitmq:amqp-client`来连接、发送和接收消息。下面是一些关键的API和概念: 1. **连接和通道(Connection and Channel)**:Java应用首先需要创建一个到...
首先,要使用RabbitMQ,你需要在项目中添加RabbitMQ的Java客户端库(rabbitmq-client)作为依赖。在Maven项目中,可以在pom.xml文件中添加以下依赖: ```xml <groupId>com.rabbitmq</groupId> <artifactId>amqp-...
在Java中实现RabbitMQ的主题(Topic)模式,能够让我们根据消息的类型进行多路广播,使得多个消费者可以订阅不同的消息类型,只接收自己关心的消息。下面将详细介绍RabbitMQ主题模式在Java中的实现。 ### 1. 安装与...
总的来说,"RabbitMQ HTTP API client for Java, Groovy, and other JVM l.zip" 提供了一个方便的工具,使开发人员能够在各种JVM环境中利用HTTP API与RabbitMQ进行高效且灵活的通信。无论是构建微服务架构,还是优化...
在Java中使用RabbitMQ,我们需要添加RabbitMQ的Java客户端库(`com.rabbitmq:amqp-client`)到项目依赖。然后,我们可以通过创建`ConnectionFactory`实例来连接到RabbitMQ Broker,接着创建`Connection`和`Channel`...
通常,开发者会使用RabbitMQ的Java客户端库(`com.rabbitmq:amqp-client`)来创建连接、通道、发布/订阅消息等。`pom.xml`文件是Maven项目的配置文件,用于管理依赖项。`src`目录下应该包含了Java源代码,`target`...
本文将详细讨论如何在Java环境下实现RabbitMQ的路由功能。 首先,理解RabbitMQ的基本概念至关重要。RabbitMQ中有几个关键角色:生产者(Producer)、消费者(Consumer)和交换器(Exchange)。生产者发布消息到交换...
同时,需要配置相应的环境变量和依赖库,如在Java项目中使用RabbitMQ,需添加RabbitMQ的Java客户端库(rabbitmq-client)。 2. **基本概念**:理解RabbitMQ中的基本概念,如`exchange`(交换器)、`queue`(队列)、`...
在Java项目中使用RabbitMQ,首先需要添加RabbitMQ的Java客户端库(如`rabbitmq-client.jar`)到项目依赖,并配置连接参数,如主机地址、端口、用户名和密码。 5.2 集群配置 为了提高RabbitMQ的可用性和容错性,可以...
在Java项目中使用RabbitMQ,我们需要引入`rabbitmq-client`库。如果你使用Maven,可以在pom.xml文件中添加以下依赖: ```xml <groupId>com.rabbitmq</groupId> <artifactId>amqp-client <version>5.x.y...
`rabbitmq-java-client-javadoc-2.5.1`是RabbitMQ的Java客户端库,提供了API供Java应用与RabbitMQ服务器交互。这个版本的文档包含了Javadoc,可以帮助开发者了解如何使用Java API来创建生产者、消费者、连接、通道等...
1. 客户端库:RabbitMQ提供了多种语言的客户端库,如Python的pika、Java的rabbitmq-client等,方便开发者在不同语言环境中使用。 2. 连接与通道:连接(Connection)是客户端与RabbitMQ服务器的通信链路,通道...