RabbitMQ是一个message broker(消息代理),接收生产者产生的消息并发送给接收者,中途可以自定义路由分发,缓存,持久化消息等。下面写个简单的消息发送接收。(不同于ZMQ的消息传递,RabbitMQ在内存中维持了一个高效的队列,消费者还没启动时,消息会被存储到队列中不会丢失)
首先启动RabbitMQ服务(windows启动方法,我这里下载的最新版3.0.1做学习测试)
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.0.1\sbin>rabbitmq-service.bat start bin doc erts-5.9.3.1 lib releases usr C:\Program Files\erl5.9.3.1\erts-5.9.3.1\bin\erlsrv: Service RabbitMQ started.
生产者Send.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { public static void main(String[] args) throws IOException { // 创建一个连接连接服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个队列,可以对队列做配置,如持久化等。然后往队列发送数据 channel.queueDeclare("queue1", false, false, false, null); String message = "test message"; channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println(" [x] Sent '" + message +"'"); channel.close(); connection.close(); } }
消费者Recv.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ShutdownSignalException; public class Recv { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // 创建一个连接接收数据 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 等待消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("queue1", true, consumer); while(true){ //nextDelivery阻塞直到收到下一条消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
运行如下:
首先添加环境变量
root # export CLASSPATH=$CLASSPATH:/path/to/rabbitmq-jar root # javac -cp .:* Recv.java Send.java
开启两个终端分别模拟生产者和消费者
root # java -cp .:* Recv [x] Received 'test message' root # java -cp .:* Send [x] Sent 'test message'
注:磁盘剩余空间需要至少1Gb,不然会提示 connection refused ,这个剩余空间大小可以配置
root # java -cp .:* Recv Exception in thread "main" java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:351) at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:213) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:200) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366) at java.net.Socket.connect(Socket.java:529) at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533) at Recv.main(Recv.java:15)
相关推荐
本示例项目——"simple-microservice",是一个精心设计的电商系统,它展示了如何将单一应用程序拆分为一组小型、独立的服务,每个服务都运行在其自己的进程中,并通过轻量级机制(如HTTP RESTful API)进行通信。...
本文主要介绍了RabbitMQ高级篇中的关键概念之一——消息可靠性及其解决方案。通过详细解析生产者确认机制的配置与实现,展示了如何确保消息在从生产者到消费者的整个传递过程中不会丢失。这对于构建可靠的消息中间件...
《恋爱交友应用——Java技术深度剖析》 在数字化时代,社交与约会已逐渐转移到线上,"Datingapp"作为一款恋爱交友应用,其背后的技术架构和实现方式无疑具有极高的研究价值。本文将深入探讨该应用如何利用Java技术...
1. **消息模型**:消息系统通常基于两种模型——点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)。点对点模型中,消息从一个生产者发送到一个特定的消费者;而发布/订阅模型下,消息发布者发送消息到主题...
Stompjs是一个JavaScript库,它提供了一个简单的文本协议——STOMP(Simple (or Streaming) Text Oriented Messaging Protocol)的实现,使得JavaScript客户端可以与各种消息代理(如ActiveMQ、RabbitMQ等)进行交互...