`
sillycat
  • 浏览: 2539626 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

RabbitMQ(4)Java Client - Hello World-Work Queue

 
阅读更多
RabbitMQ(4)Java Client - Hello World-Work Queue

RabbitMQ is a message broker, accepts and forwards messages.

1. RabbitMQ jargon
Producing - A program that sends messages is a producer.
Queue - A queue is the name for a mailbox. It can store as many messages as you like.
Consuming - A consumer is a program that mostly waits to receive messages.

2. Simple Java "Hello World"
RabbitMQ speaks AMQP, which is an open, general-purpose protocol for messaging.

Test Producer
package com.sillycat.easytalker.rabbitmq.hello;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TestProducer {
private final static String QUEUE_NAME = "hello";
private final static String SERVER_HOST = "localhost";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World! Woo!2";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}

Test Consumer
package com.sillycat.easytalker.rabbitmq.hello;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class TestConsumer {
private final static String QUEUE_NAME = "hello";
private final static String SERVER_HOST = "localhost";
public static void main(String[] args) throws IOException,
ShutdownSignalException, ConsumerCancelledException,
InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(SERVER_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("
  • Waiting for messages. To exit press CTRL+C");
  • QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
    Thread.sleep(5000);
    }
    }
    }

    3. Simple Java Work Queues
    We will create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

    Work Queues(aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete.

    This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

    My NewTask.java Class is mostly the same as before, but we send a lot of tasks this time.
    package com.sillycat.easytalker.rabbitmq.workqueue;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;

    public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    private final static String SERVER_HOST = "localhost";

    public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(SERVER_HOST);

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = "job";
    for(int i = 0;i<10;i++){
    message = "job" + i;
    channel.basicPublish("", TASK_QUEUE_NAME,
    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    }
    channel.close();
    connection.close();
    }
    }

    The worker.java class, we can run 2 instances of this class to see, the log messsage as follow:
  • Waiting for messages. To exit press CTRL+C
  • [x] Received 'job0'
    [x] Done
    [x] Received 'job2'
    [x] Done
    [x] Received 'job4'
    [x] Done
    [x] Received 'job6'
    [x] Done
    [x] Received 'job8'
    [x] Done

    Another Instance:
  • Waiting for messages. To exit press CTRL+C
  • [x] Received 'job1'
    [x] Done
    [x] Received 'job3'
    [x] Done
    [x] Received 'job5'
    [x] Done
    [x] Received 'job7'
    [x] Done
    [x] Received 'job9'
    [x] Done

    package com.sillycat.easytalker.rabbitmq.workqueue;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    public class Worker {
    private static final String TASK_QUEUE_NAME = "task_queue";
    private final static String SERVER_HOST = "localhost";
    public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(SERVER_HOST);

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    System.out.println("
  • Waiting for messages. To exit press CTRL+C");

  • channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
    doWork(message);
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }

    private static void doWork(String task) throws InterruptedException {
    for (char ch : task.toCharArray()) {
    if (ch == '.')
    Thread.sleep(5000);
    }
    }
    }

    Message Acknowledgment
    If a worker dies, we'd like the task to be delivered to another worker.
    RabbitMQ supports message acknowledgements. An ack is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it. (ack acknowledgement charactor)

    If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer.

    Message acknowledgments are turned on by default. autoAck=true flag will turned them off.
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

    Message Durability
    We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.

    When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. We need to mark both the queue and messages as durable.
    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);

    Set the message to persistent
    channel.basicPublish("", TASK_QUEUE_NAME,
    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in a transaction.

    Fair dispatch
    In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    references:
    http://www.rabbitmq.com/java-client.html
    https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java
    http://www.rabbitmq.com/api-guide.html
    http://www.rabbitmq.com/getstarted.html
    http://www.rabbitmq.com/tutorials/tutorial-two-java.html
    分享到:
    评论

    相关推荐

      rabbitmq-java-client-bin-3.3.4.zip

      在"rabbitmq-java-client-bin-3.3.4.zip"这个压缩包中,包含的是RabbitMQ的Java客户端库,这是与RabbitMQ服务器通信的一个关键组件。RabbitMQ提供了多种语言的客户端,Java客户端则是针对Java开发者设计的,使得Java...

      rabbitmq-java-client-bin-3.3.4

      在“rabbitmq-java-client-bin-3.3.4”这个压缩包中,包含了该版本的Java客户端库及相关文档,为Java应用提供了可靠的异步通信支持。 首先,RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议的开源...

      rabbitmq-java (2).zip

      在这个名为"rabbitmq-java (2).zip"的压缩包中,我们可以看到几个关键文件,它们构成了一个使用Java与RabbitMQ交互的项目。 1. `rabbitmq-java.iml`:这是IntelliJ IDEA项目文件,包含了项目的模块设置和依赖关系。...

      rabbitmq-java-client-bin-2.7.0.zip

      在这个"rabbitmq-java-client-bin-2.7.0.zip"压缩包中,我们主要关注的是RabbitMQ的Java客户端库。 RabbitMQ Java客户端是Java开发者与RabbitMQ服务器进行交互的主要工具,允许程序发送和接收消息。2.7.0是这个...

      rabbitmq-java-client-3.4.1.zip

      在这个"rabbitmq-java-client-3.4.1.zip"压缩包中,包含的是RabbitMQ Java客户端库的3.4.1版本,这个版本的客户端可以让你在Java应用中轻松地与RabbitMQ服务器进行交互。 **RabbitMQ核心概念:** 1. **消息**:是...

      rabbitmq-java-client-bin-3.0.4.zip

      在这个场景中,"rabbitmq-java-client-bin-3.0.4.zip"是一个包含RabbitMQ Java客户端库的压缩包,适用于Android应用和后台服务器之间的通信。 首先,我们来了解一下RabbitMQ Java客户端。这个客户端库允许Java...

      RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包

      RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...

      rabbitmq-java-client-master.zip

      6. **Java客户端使用**:使用"rabbitmq-java-client"时,开发者需要创建ConnectionFactory,建立与RabbitMQ服务器的连接,然后创建Channel并声明Exchange和Queue,最后绑定Queue到Exchange,从而完成消息的发布和...

      rabbitMQ实战java版-rabbitMQ-demo.zip

      《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

      rabbitmq-client-1.3.0.jar

      本文将详细探讨RabbitMQ的客户端库——rabbitmq-client-1.3.0.jar,以及它在Java应用程序中的应用。 首先,`rabbitmq-client-1.3.0.jar`是RabbitMQ官方提供的Java客户端库,用于与RabbitMQ服务器进行通信。这个版本...

      rabbitmq-dotnet-client-3.5.0

      《RabbitMQ Dotnet客户端3.5.0详解》 在分布式系统中,消息队列作为重要的组件之一,被广泛用于解耦系统、提高可扩展性和处理异步任务。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定、高效和易用性受到了...

      rabbitmq-java-client-javadoc-2.7.0.zip

      《RabbitMQ Java客户端2.7.0版API文档详解》 RabbitMQ Java客户端库是用于Java开发者与RabbitMQ消息代理进行交互的核心工具。RabbitMQ是一种开源的消息代理和队列服务器,广泛应用于分布式系统中的异步处理、任务...

      rabbitmq-dotnet-client-3.6.4-dotnet-4.6.1.rar

      RabbitMQ-dotnet-client-3.6.4-dotnet-4.6.1.rar是一个包含RabbitMQ .NET客户端库的压缩包,适用于.NET Framework 4.6.1环境。这个压缩包提供了一个演示如何在WCF(Windows Communication Foundation)服务中使用...

      Java Rabbitmq-client

      Java 客户端库 RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。java Android RabbitMQ可以用来发送和接收消息

      RabbitMQ Stream 教程 - Hello World!.pdf

      RabbitMQ Stream 教程 - "Hello World!

      rabbitmq-client.jar

      rabbitmq的javaClient库,导入到项目中便可使用

      java队列源码-rabbitmq-repository:RabbitMQ消息队列学习的源码记录

      java 队列源码 #rabbitMQ repository 主要记录个人学习reabbit的相关demo ...rabbitmq-spring-work-queue spring boot使用rabbitmq的队列示例 rabbitmq-spring-fanout spring boot使用的rabbitmq的发布订阅示例 rabb

      rabbitmq-server-generic-unix-3.5.7.tar.rar下载,rabbitmq安装包

      在您提供的资源中,“rabbitmq-server-generic-unix-3.5.7.tar.rar”是一个针对Linux平台的RabbitMQ服务器的离线安装包。这个版本为3.5.7,您需要在Windows环境下解压后再用于Linux系统。下面将详细介绍RabbitMQ的...

      rabbitmq-java-client-2.7.0.zip

      《RabbitMQ Java客户端2.7.0详解》 RabbitMQ Java客户端2.7.0是用于与RabbitMQ消息代理进行交互的Java库,它提供了丰富的API,使得Java开发者能够轻松地在应用程序中集成消息队列功能。RabbitMQ作为一款开源的消息...

      spring-rabbitmq-helloworld

      spring和rabbitmq整合的helloworld

    Global site tag (gtag.js) - Google Analytics