- 浏览: 2551981 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
RabbitMQ(4)Java Client - Hello World-Work Queue
RabbitMQ is a message broker, accepts and forwards messages.
1. RabbitMQ jargon
Producing - A program that sends messages is a producer.
Queue - A queue is the name for a mailbox. It can store as many messages as you like.
Consuming - A consumer is a program that mostly waits to receive messages.
2. Simple Java "Hello World"
RabbitMQ speaks AMQP, which is an open, general-purpose protocol for messaging.
Test Producer
package com.sillycat.easytalker.rabbitmq.hello;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TestProducer {
private final static String QUEUE_NAME = "hello";
private final static String SERVER_HOST = "localhost";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World! Woo!2";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
Test Consumer
package com.sillycat.easytalker.rabbitmq.hello;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class TestConsumer {
private final static String QUEUE_NAME = "hello";
private final static String SERVER_HOST = "localhost";
public static void main(String[] args) throws IOException,
ShutdownSignalException, ConsumerCancelledException,
InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(5000);
}
}
}
3. Simple Java Work Queues
We will create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
Work Queues(aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete.
This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.
My NewTask.java Class is mostly the same as before, but we send a lot of tasks this time.
package com.sillycat.easytalker.rabbitmq.workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "job";
for(int i = 0;i<10;i++){
message = "job" + i;
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
The worker.java class, we can run 2 instances of this class to see, the log messsage as follow:
Waiting for messages. To exit press CTRL+C
[x] Received 'job0'
[x] Done
[x] Received 'job2'
[x] Done
[x] Received 'job4'
[x] Done
[x] Received 'job6'
[x] Done
[x] Received 'job8'
[x] Done
Another Instance:
Waiting for messages. To exit press CTRL+C
[x] Received 'job1'
[x] Done
[x] Received 'job3'
[x] Done
[x] Received 'job5'
[x] Done
[x] Received 'job7'
[x] Done
[x] Received 'job9'
[x] Done
package com.sillycat.easytalker.rabbitmq.workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.')
Thread.sleep(5000);
}
}
}
Message Acknowledgment
If a worker dies, we'd like the task to be delivered to another worker.
RabbitMQ supports message acknowledgements. An ack is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. (ack acknowledgement charactor)
If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer.
Message acknowledgments are turned on by default. autoAck=true flag will turned them off.
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
Message Durability
We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. We need to mark both the queue and messages as durable.
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
Set the message to persistent
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in a transaction.
Fair dispatch
In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
references:
http://www.rabbitmq.com/java-client.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java
http://www.rabbitmq.com/api-guide.html
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
RabbitMQ is a message broker, accepts and forwards messages.
1. RabbitMQ jargon
Producing - A program that sends messages is a producer.
Queue - A queue is the name for a mailbox. It can store as many messages as you like.
Consuming - A consumer is a program that mostly waits to receive messages.
2. Simple Java "Hello World"
RabbitMQ speaks AMQP, which is an open, general-purpose protocol for messaging.
Test Producer
package com.sillycat.easytalker.rabbitmq.hello;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TestProducer {
private final static String QUEUE_NAME = "hello";
private final static String SERVER_HOST = "localhost";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World! Woo!2";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
Test Consumer
package com.sillycat.easytalker.rabbitmq.hello;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class TestConsumer {
private final static String QUEUE_NAME = "hello";
private final static String SERVER_HOST = "localhost";
public static void main(String[] args) throws IOException,
ShutdownSignalException, ConsumerCancelledException,
InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(5000);
}
}
}
3. Simple Java Work Queues
We will create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
Work Queues(aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete.
This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.
My NewTask.java Class is mostly the same as before, but we send a lot of tasks this time.
package com.sillycat.easytalker.rabbitmq.workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "job";
for(int i = 0;i<10;i++){
message = "job" + i;
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
The worker.java class, we can run 2 instances of this class to see, the log messsage as follow:
[x] Done
[x] Received 'job2'
[x] Done
[x] Received 'job4'
[x] Done
[x] Received 'job6'
[x] Done
[x] Received 'job8'
[x] Done
Another Instance:
[x] Done
[x] Received 'job3'
[x] Done
[x] Received 'job5'
[x] Done
[x] Received 'job7'
[x] Done
[x] Received 'job9'
[x] Done
package com.sillycat.easytalker.rabbitmq.workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
private final static String SERVER_HOST = "localhost";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.')
Thread.sleep(5000);
}
}
}
Message Acknowledgment
If a worker dies, we'd like the task to be delivered to another worker.
RabbitMQ supports message acknowledgements. An ack is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. (ack acknowledgement charactor)
If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer.
Message acknowledgments are turned on by default. autoAck=true flag will turned them off.
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
Message Durability
We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. We need to mark both the queue and messages as durable.
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
Set the message to persistent
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in a transaction.
Fair dispatch
In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
references:
http://www.rabbitmq.com/java-client.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java
http://www.rabbitmq.com/api-guide.html
http://www.rabbitmq.com/getstarted.html
http://www.rabbitmq.com/tutorials/tutorial-two-java.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1678I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 294Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 449Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 294Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 248Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 322AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 313Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 343Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 455Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 510Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 370Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 331Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 373Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 438Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 530MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 464RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 323Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 324Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 329ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 404Jetty Server and Cookie Domain ...
相关推荐
在"rabbitmq-java-client-bin-3.3.4.zip"这个压缩包中,包含的是RabbitMQ的Java客户端库,这是与RabbitMQ服务器通信的一个关键组件。RabbitMQ提供了多种语言的客户端,Java客户端则是针对Java开发者设计的,使得Java...
在“rabbitmq-java-client-bin-3.3.4”这个压缩包中,包含了该版本的Java客户端库及相关文档,为Java应用提供了可靠的异步通信支持。 首先,RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源...
在这个名为"rabbitmq-java (2).zip"的压缩包中,我们可以看到几个关键文件,它们构成了一个使用Java与RabbitMQ交互的项目。 1. `rabbitmq-java.iml`:这是IntelliJ IDEA项目文件,包含了项目的模块设置和依赖关系。...
在这个"rabbitmq-java-client-bin-2.7.0.zip"压缩包中,我们主要关注的是RabbitMQ的Java客户端库。 RabbitMQ Java客户端是Java开发者与RabbitMQ服务器进行交互的主要工具,允许程序发送和接收消息。2.7.0是这个...
在这个"rabbitmq-java-client-3.4.1.zip"压缩包中,包含的是RabbitMQ Java客户端库的3.4.1版本,这个版本的客户端可以让你在Java应用中轻松地与RabbitMQ服务器进行交互。 **RabbitMQ核心概念:** 1. **消息**:是...
在这个场景中,"rabbitmq-java-client-bin-3.0.4.zip"是一个包含RabbitMQ Java客户端库的压缩包,适用于Android应用和后台服务器之间的通信。 首先,我们来了解一下RabbitMQ Java客户端。这个客户端库允许Java...
RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...
6. **Java客户端使用**:使用"rabbitmq-java-client"时,开发者需要创建ConnectionFactory,建立与RabbitMQ服务器的连接,然后创建Channel并声明Exchange和Queue,最后绑定Queue到Exchange,从而完成消息的发布和...
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
《RabbitMQ Dotnet客户端3.5.0详解》 在分布式系统中,消息队列作为重要的组件之一,被广泛用于解耦系统、提高可扩展性和处理异步任务。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定、高效和易用性受到了...
本文将详细探讨RabbitMQ的客户端库——rabbitmq-client-1.3.0.jar,以及它在Java应用程序中的应用。 首先,`rabbitmq-client-1.3.0.jar`是RabbitMQ官方提供的Java客户端库,用于与RabbitMQ服务器进行通信。这个版本...
《RabbitMQ Java客户端2.7.0版API文档详解》 RabbitMQ Java客户端库是用于Java开发者与RabbitMQ消息代理进行交互的核心工具。RabbitMQ是一种开源的消息代理和队列服务器,广泛应用于分布式系统中的异步处理、任务...
在编译过程中,可能需要确保系统已安装必要的依赖,例如`libssl-dev`和`libcurl4-openssl-dev`,这些是`rabbitmq-c`所需的SSL和Curl库。 `rabbitmq-c`库提供了以下关键功能: - 连接管理:连接到RabbitMQ服务器,...
RabbitMQ-dotnet-client-3.6.4-dotnet-4.6.1.rar是一个包含RabbitMQ .NET客户端库的压缩包,适用于.NET Framework 4.6.1环境。这个压缩包提供了一个演示如何在WCF(Windows Communication Foundation)服务中使用...
Java 客户端库 RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。java Android RabbitMQ可以用来发送和接收消息
RabbitMQ Stream 教程 - "Hello World!
rabbitmq的javaClient库,导入到项目中便可使用
java 队列源码 #rabbitMQ repository 主要记录个人学习reabbit的相关demo ...rabbitmq-spring-work-queue spring boot使用rabbitmq的队列示例 rabbitmq-spring-fanout spring boot使用的rabbitmq的发布订阅示例 rabb
在您提供的资源中,“rabbitmq-server-generic-unix-3.5.7.tar.rar”是一个针对Linux平台的RabbitMQ服务器的离线安装包。这个版本为3.5.7,您需要在Windows环境下解压后再用于Linux系统。下面将详细介绍RabbitMQ的...
《RabbitMQ Java客户端2.7.0详解》 RabbitMQ Java客户端2.7.0是用于与RabbitMQ消息代理进行交互的Java库,它提供了丰富的API,使得Java开发者能够轻松地在应用程序中集成消息队列功能。RabbitMQ作为一款开源的消息...