Basic RPC
As a programming convenience, the Java client API offers a class RpcClient
which uses a temporary reply queue to provide simple RPC-style communication facilities via AMQP.
The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.
实现Rpc形式的发送与接收消息
/**
* 使用RpcClient发送消息
*
* @author sunjun
* @createtime 2010-4-27 上午11:21:05
*/
public class TestSender2 extends TestBase {
/**
* 发送一个消息
*
* @throws IOException
*/
public void sendMsg() throws IOException {
// 创建一个channel
Channel channel = getChannel();
// 设置return listener,用于监听回复(发生异常时)
channel.setReturnListener(new ReturnListener() {
@Override
public void handleBasicReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.out
.println("-----------get exception reply message---------");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("ContentEncoding: "
+ properties.getContentEncoding());
System.out.println("content type: "
+ properties.getContentType());
System.out.println("Expiration: " + properties.getExpiration());
System.out.println("type: " + properties.getType());
System.out.println("reply to: " + properties.getReplyTo());
System.out.println("body: " + new String(body));
}
});
// 定义一个queue
String queue = "test.queue";
channel.queueDeclare(queue);
// 定义一个exchange
String exchange = "test.exchange";
channel.exchangeDeclare(exchange, "direct");
// bind queue到exchange上
String routingKey = "test.routingkey";
channel.queueBind(queue, exchange, routingKey);
RpcClient rpcClient = new RpcClient(channel, exchange, routingKey);
try {
// 发送消息
String msg = "hello world!";
byte[] result = rpcClient.primitiveCall(msg.getBytes());
System.out.println(new String(result));
} catch (ShutdownSignalException e) {
e.printStackTrace();
}
System.out.println("send message success.");
// close
// close(channel);
}
/**
* @param args
*/
public static void main(String[] args) {
TestSender2 testSender = new TestSender2();
try {
testSender.sendMsg();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 使用RpcServer接收消息,并发送回复消息
*
* @author sunjun
* @createtime 2010-4-27 下午03:14:31
*/
public class TestReceiver3 extends TestBase {
/**
* 接收消息
*
* @throws IOException
*/
public void receive() throws IOException {
// 获取channel
Channel channel = getChannel();
// 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
String queue = "test.queue";
channel.queueDeclare(queue);
System.out.println("receive message started.");
// 循环取消息
while (true) {
RpcServer rpcServer = new RpcServer(channel, queue) {
@Override
public byte[] handleCall(Delivery request,
BasicProperties replyProperties) {
// 接收消息
byte[] message = request.getBody();
// 发送返回消息
return super.handleCall(request, replyProperties);
}
};
}
}
public static void main(String[] args) throws IOException {
new TestReceiver3().receive();
}
}
分享到:
相关推荐
1. **生产者(Producer)**:生产者是向RabbitMQ发送消息的应用程序。在Java客户端中,生产者通过创建一个连接到RabbitMQ服务器的Channel,然后定义交换机(Exchange)和路由键(Routing Key),最后发布消息到指定的...
6. **发布/消费(Publish/Consume)**:发布者通过交换器将消息发送到队列,消费者则从队列中接收并处理消息。RabbitMQ支持同步和异步消费模式,可以根据应用需求选择合适的方式。 在“rabbitmq-java-client-bin-...
随书源码“RabbitMQ实战-随书源码”和“RabbitMQ in action 和 RabbitMQ实战-高效部署分布式消息队列”的配套代码,提供了丰富的示例,帮助读者深入理解RabbitMQ的使用和实践。 1. **RabbitMQ基础** - **AMQP协议*...
用户解压后,可以查看其中的代码、配置文件或其他文档,以了解如何设置和操作 RabbitMQ 服务器,或者如何编写客户端应用程序来发送和接收消息。 综合以上信息,我们可以知道这个压缩包旨在帮助用户学习和使用 ...
2. **创建通道**:通过`IConnection`对象的`CreateModel()`方法创建一个新的通道,通道是AMQP中的工作单元,用于发送和接收消息。 3. **声明交换机和队列**:使用`channel.ExchangeDeclare()`和`channel....
- **交换机(Exchange)**:交换机负责接收生产者发送的消息,并根据预定义的路由规则将其分发到相应的队列。 - **队列(Queue)**:队列是存储消息的地方,它是持久化的,可以有多个消费者同时监听。 - **绑定...
标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...
- 建立通道(channel),这是RabbitMQ中发送和接收消息的实体。 - 定义交换机(exchange)和队列(queue),决定消息如何路由。 - 发布消息到交换机,RabbitMQ会根据交换机类型和绑定规则将消息放入队列。 - 如果...
在实际开发中,你可以创建一个简单的生产者示例,发送一条消息到RabbitMQ服务器,然后创建一个消费者示例来接收并处理该消息。这涉及到了连接建立、通道打开、交换机选择、队列声明、消息发送和消费等一系列步骤。 ...
这个客户端库允许Java开发者轻松地与RabbitMQ服务器进行交互,发送和接收消息。在"rabbitmq-java-client-3.0.4"目录下,通常会包含JAR文件和其他必要的资源,如文档、示例代码等。开发者可以将这些JAR文件添加到他们...
本文将围绕RabbitMQ的Java实践,通过分析"rabbitMQ-demo.zip"中的示例项目"rabbitMQ-demo-main",深入讲解RabbitMQ的核心概念、工作模式以及如何在Java环境中进行集成和应用。 一、RabbitMQ核心概念 1. **Broker**...
9. **示例代码**:使用`rabbitmq-c++`,开发者可以编写简洁的C++代码来发送和接收消息,例如: ```cpp RabbitMQConnection conn("localhost", 5672, "guest", "guest"); RabbitMQChannel channel(conn); channel....
2. **交换器(Exchange)**:负责接收生产者发送的消息,并根据预定义的路由规则将消息分发到相应的队列。 3. **队列(Queue)**:存储消息的容器,可以理解为消息的缓冲区,多个消费者可以共享同一个队列。 4. **...
2. **发布与消费消息**:生产者使用`publish`方法发布消息到交换器,消费者通过`consume`方法订阅队列并接收消息。 3. **交换器类型**:包括Direct、Fanout、Topic和Headers四种,根据业务需求选择合适的类型进行...
本文将深入探讨C#环境下如何利用RabbitMQ实现消息的发送与接收,以及相关的关键概念和技术。 首先,我们要理解RabbitMQ的基本工作原理。RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,它定义了消息...
在这个示例中,我们创建了一个名为`test_queue`的队列,并发送了一条消息"Hello, RabbitMQ!"。注意,实际应用中需要替换默认的guest用户和端口,因为它们只适用于本地开发。 最后,"ZZB_net01"可能是一个包含...
**RabbitMQ-C:C语言实现的RabbitMQ客户端库** RabbitMQ-C是一个开源的、用C语言编写的RabbitMQ客户端库...通过源代码分析、示例学习和测试运行,开发者可以深入理解RabbitMQ-C的工作原理,并在自己的项目中灵活运用。
2. **交换机(Exchange)**:负责接收生产者发送的消息,并根据预设的路由规则将其分发到不同的队列中。 3. **队列(Queue)**:存储消息的容器,多个消费者可以同时从一个队列中获取消息,但每个消息只会被一个消费者...
这个目录下的代码可能包含了如何连接到RabbitMQ服务器,创建通道(Channel),定义交换机(Exchange)和队列(Queue),并发送消息的示例。交换机是RabbitMQ中的一种路由机制,它根据预定义的规则(Binding)将消息...
RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于处理异步任务、解耦组件以及实现消息传递。本实战演练将带你深入理解RabbitMQ的核心概念...