`

rabbitmq实现高吞吐量的rpc调用

阅读更多
rabbitmq实现rpc调用基本思路:
客户端(client):客户端发起rpc调用,这当成一个消息,发送到rabbitmq服务器。这个消息会携带两个特殊(额外)的信息,一个是调用序号,一个是回调队列名称。调用序号需要服务端原样返回,而回调队列名称是用于服务端将结果放入这个队列中,以便客户端取回结果。
服务端(service):服务端接收到了一个rpc调用后,执行调用代码,将结果返回到指定的回调队列中去。
这里我们可以约定一个数据模型,见代码:
public class RpcInvokeModel implements Serializable {
          //真实传输的数据
private Object target;
          //调用序号
private Long invokeFlag;
          //回调队列名称
        private String callBackQueue;
}
客户端将所有的调用封装成这个模型,然后发送出去。服务端解析成这个模型,当然,你可以再这个模型中加入别的参数(如方法名称等等)。
客户端代码:
package com.pdy.rabbitmq.rpc;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Represents a connection with a queue
*
* @author syntx
*
*/
public abstract class ClientEndPoint {

protected Channel producerChannel;
protected Channel consumerChannel;
protected Connection connection;
protected String endPointName;

public ClientEndPoint(String endpointName) throws IOException,
TimeoutException {
this.endPointName = endpointName;

// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();

// hostname of your rabbitmq server
// factory.setVirtualHost("pdy");
factory.setHost("localhost");
// factory.setUsername("pdy");
// factory.setPassword("pdy");
// factory.setVirtualHost("pdy11");
// getting a connection
connection = factory.newConnection();
// creating a channel
producerChannel = connection.createChannel();
consumerChannel = connection.createChannel();

producerChannel.queueDeclare(endpointName, false, false, false, null);
}

/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
*
* @throws IOException
* @throws TimeoutException
*/
public void close() throws IOException, TimeoutException {
this.producerChannel.close();
this.consumerChannel.close();
this.connection.close();
}
}

package com.pdy.rabbitmq.rpc;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

public class Client extends ClientEndPoint implements Consumer, Runnable {

public Client(String endpointName) throws IOException, TimeoutException {
super(endpointName);
DeclareOk ok = super.consumerChannel.queueDeclare();
callBackQueue = ok.getQueue();
super.consumerChannel.basicConsume(callBackQueue, true, this);
}

@Override
public void handleConsumeOk(String consumerTag) {

}

@Override
public void handleCancelOk(String consumerTag) {

}

@Override
public void handleCancel(String consumerTag) throws IOException {

}

@Override
public void handleDelivery(String arg0, Envelope arg1,
BasicProperties arg2, byte[] arg3) throws IOException {

RpcInvokeModel invokeModel = SerializationUtils.deserialize(arg3);

synchronized (callResult) {

callResult.put(invokeModel.getInvokeFlag(), invokeModel.getObj());
callResult.notifyAll();
}

}

@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {

}

@Override
public void handleRecoverOk(String consumerTag) {

}

public void start() throws IOException {

Random random = new Random();
for (int i = 0; i < 10; i++) {

Double[] arr = new Double[10];

for (int j = 0; j < arr.length; j++) {
arr[j] = random.nextDouble();
}

System.out.println("调用rpc服务:" + Arrays.toString(arr));

RpcFuture<Double[]> result = invokeSort(arr);
Double[] obj = result.get();
System.out.println("调用rpc服务响应的结果:" + Arrays.toString(obj));
}
}

private RpcFuture<Double[]> invokeSort(Double[] arr) throws IOException {

RpcInvokeModel mo = new RpcInvokeModel();
mo.setInvokeFlag(++invokeFlag);
mo.setObj(arr);
byte[] body = SerializationUtils.serialize(mo);

BasicProperties basicProperties = new BasicProperties().builder()
.replyTo(callBackQueue).build();
super.producerChannel.basicPublish("", super.endPointName,
basicProperties, body);

return new RpcFuture<>(this, mo.getInvokeFlag());
}

private final String callBackQueue;
private final Map<Long, Object> callResult = new HashMap<>();
private volatile long invokeFlag = 0;

public Object getResultByFlagKey(Long flagKey) {

Object result = null;
while (true) {
synchronized (callResult) {
result = callResult.remove(flagKey);
if (result == null) {
try {
callResult.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
return result;
}
}
}
}

@Override
public void run() {

try {
start();
} catch (IOException e) {
e.printStackTrace();
}
}
}

package com.pdy.rabbitmq.rpc;

public class RpcFuture<T> {

private final Client client;
private final Long flagKey;

public RpcFuture(Client client, Long flagKey) {

this.client = client;
this.flagKey = flagKey;
}

public T get() {

return (T) client.getResultByFlagKey(flagKey);
}
}


服务端代码:
package com.pdy.rabbitmq.rpc;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Represents a connection with a queue
*
* @author syntx
*
*/
public abstract class ServiceEndPoint {

protected Channel producerChannel;
protected Channel consumerChannel;
protected Connection connection;
protected String endPointName;

public ServiceEndPoint(String endpointName) throws IOException,
TimeoutException {
this.endPointName = endpointName;

// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();

// hostname of your rabbitmq server
// factory.setVirtualHost("pdy");
factory.setHost("localhost");
// factory.setUsername("pdy");
// factory.setPassword("pdy");
// factory.setVirtualHost("pdy11");
// getting a connection
connection = factory.newConnection();
// creating a channel
producerChannel = connection.createChannel();
consumerChannel = connection.createChannel();

consumerChannel.queueDeclare(endpointName, false, false, false, null);
}

/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
*
* @throws IOException
* @throws TimeoutException
*/
public void close() throws IOException, TimeoutException {
this.producerChannel.close();
this.consumerChannel.close();
this.connection.close();
}
}

package com.pdy.rabbitmq.rpc;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

public class Service extends ServiceEndPoint implements Consumer {

public Service(String endpointName) throws IOException, TimeoutException {
super(endpointName);
}

public void start() throws IOException {

super.consumerChannel.basicConsume(super.endPointName, true, this);
}

@Override
public void handleConsumeOk(String consumerTag) {

}

@Override
public void handleCancelOk(String consumerTag) {

}

@Override
public void handleCancel(String consumerTag) throws IOException {

}

@Override
public void handleDelivery(String arg0, Envelope arg1,
BasicProperties arg2, byte[] arg3) throws IOException {

RpcInvokeModel invokeModel = SerializationUtils.deserialize(arg3);
Double[] obj = (Double[]) invokeModel.getObj();

Arrays.sort(obj);

invokeModel.setObj(obj);

byte[] body = SerializationUtils.serialize(invokeModel);

String routingKey = arg2.getReplyTo();
super.producerChannel.basicPublish("", routingKey, null, body);
}

@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {

}

@Override
public void handleRecoverOk(String consumerTag) {

}
}

测试代码:
package test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.pdy.rabbitmq.rpc.Client;
import com.pdy.rabbitmq.rpc.Service;

public class RpcTest {

public static void main(String[] args) throws IOException, TimeoutException {

String queue = "pdyRpc";
client(queue);
service(queue);
}

private static void service(String queue) throws IOException,
TimeoutException {

Service service = new Service(queue);
service.start();
}

private static void client(String queue) throws IOException,
TimeoutException {

Client client = new Client(queue);
new Thread(client).start();
}
}

代码还不满足线程安全,但是改成线程安全是很容易的事情。

0
0
分享到:
评论

相关推荐

    RabbitMQ 讲义.pdf

    其中,ActiveMQ基于JMS协议,而ZeroMQ是基于C语言开发,RabbitMQ基于AMQP协议并使用Erlang语言编写,RocketMQ是阿里巴巴基于JMS协议开发的产品,Kafka是一个分布式消息系统,有高吞吐量的特点。 RabbitMQ是一个基于...

    RabbitMQ.pptx介绍RabbitMQ具体事项

    比如ActiveMQ以其成熟和丰富的文档受到青睐,RabbitMQ凭借其基于Erlang的高效并发性能和丰富的管理界面脱颖而出,RocketMQ在分布式架构下表现出极高的可用性,而Kafka则专为大数据处理设计,具有高吞吐量和低延迟。...

    rabbitmq和erl集合最新版.zip

    1. **并发性**:Erlang的轻量级进程和强大的并发机制使得RabbitMQ能处理大量并发连接和高吞吐量的消息传输。 2. **容错性**:Erlang的分布式特性和错误恢复机制使得RabbitMQ具有很高的可用性和故障恢复能力。 3. *...

    rabbitmq面试题.pdf

    - **工作原理:** 节点间共享队列数据,实现高可用和负载均衡。 **14. RabbitMQ的镜像队列(Mirrored Queues)是什么?** - 镜像队列是指在集群内为每个队列创建一个副本,提高队列的可用性和容错能力。 **15. ...

    RabbitMQ入门操作手册.pdf

    Kafka以其高性能、高吞吐量和大规模消息堆积能力而著名,适用于实时处理和大数据分析。RocketMQ源自Metaq,提供严格的顺序消息、多种消息拉取模式和事务消息等功能,适合大规模分布式系统。RabbitMQ则基于AMQP,支持...

    RabbitMQ-最完整最全教程

    RabbitMQ提供了六种工作模式:简单模式、work模式、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式和RPC远程调用模式。 RabbitMQ的安装及配置包括了安装说明和用户以及VirtualHosts的配置。...

    jaffa-rpc-library::rocket:高性能RPC库,用于使用ZeroMQKafkaHTTP1.11.1RabbitMQgRPC的Java Spring应用程序之间的通信

    Jaffa RPC库 该库提供Java Spring应用程序之间的RPC通信。 主要特点: Apache ZooKeeper(带...gRPC(使用TLSv1.2) 吞吐量极高 RabbitMQ(使用TLS 1.2的登录名/密码) 低延迟 高通量 坚持不懈 支持2种序列化协议:

    cloud 分布式微服务 服务互相注册调用 服务调用方式

    3. Message Queue(MQ):例如RabbitMQ、Kafka等,服务之间通过消息队列异步通信,解耦了调用关系,提高系统吞吐量和响应速度,但可能会引入延迟。 4. Service Mesh:如Istio、Linkerd等,是一种更高级的服务间通信...

    rabbitmq的接口函数说明,api参数使用说明

    增加这个值可以提高吞吐量,但可能会增加延迟。 - `heartbeat`: 客户端发送心跳的时间间隔。如果不设置,则RabbitMQ服务器默认每60秒检查一次连接。 - `sasl_method`: 鉴权方法,如AMQP_SASL_METHOD_PLAIN表示使用...

    14章MQ大牛成长课-从0到1手写分布式消息队列中间件

    AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 二、MQ的主要功能 RabbitMQ的主要功能包括实现应用程序的异步和解耦,同时也能起到消息缓冲和消息分发...

    基于微服务的网上商城系统的设计与实现

    - 使用JMeter等工具模拟大量并发请求,测试系统的响应时间和吞吐量。 - 根据测试结果调整服务配置、优化代码逻辑等。 3. **安全性测试**: - 检测是否存在SQL注入、XSS攻击等安全漏洞。 - 加密敏感信息,如用户...

    2020年腾讯Java高级笔试面试题.pdf

    6. 消息中间件的区别和特性:文档提到了ActiveMQ、RabbitMQ、RocketMQ和Kafka等消息中间件,列举了它们的开发语言、单机吞吐量、时效性、可用性以及功能特性。这部分内容考察了应聘者对不同消息中间件应用场景和技术...

    消息中间件的设计

    通过异步处理,可以在不影响系统响应时间的前提下,提高消息的吞吐量。 6. **流量控制与公平调度**:为了防止下游系统过载,消息队列需要具备流量控制功能,合理地调整消息的推送速率。同时,还需要实现公平调度...

    基于SpringBoot+Zookeeper+Dubbo打造分布式高并发商品秒杀系统.zip

    4. **异步处理**:对于非实时性的操作,如订单创建、库存扣减,可以采用消息队列(如RabbitMQ)进行异步处理,提高系统吞吐量。 5. **数据库优化**:采用读写分离、分库分表等手段,提高数据库的并发处理能力。 6....

    实现消息推送核心搭建(升级版)

    10. **性能优化**:针对高并发场景,可能需要考虑负载均衡、缓存策略、批量推送等优化手段,提升系统的吞吐量和响应速度。 以上就是基于Dubbo的“dubbo-demo-consumer”项目实现消息推送服务的一些核心技术和流程。...

    PHP高级编程之消息队列原理与实现方法详解

    生产者可以快速地发送消息,而不必等待消费者处理,从而提高吞吐量。 - **负载均衡**:消息队列可以帮助平衡系统负载,通过分配任务到多个工作线程或节点,避免单点过载。 - **容错性**:如果消费者出现问题,消息...

    西电分布式计算课程(PPT总结版)笔记

    - **通信技术的重要性:** 在分布式计算领域,节点之间的高效通信是实现高性能计算的核心。文档重点介绍了几种通信技术: - **底层通信技术:** 包括TCP/UDP这样的点对点通信技术。 - **并发服务技术:** 如多线程...

    SpringCloud 22道面试题和答案.docx

    负载均衡是将工作负载均匀分配到多个计算资源,以优化资源使用、提高吞吐量、减少响应时间并确保无单一故障点,提高系统的可靠性和可用性。这通常涉及到专门的软件或硬件设备。在微服务架构中,负载均衡器能够有效地...

    互联网分布式项目源码V4.zip

    本项目可能采用了RabbitMQ或者Kafka等MQ服务,允许不同服务间异步通信,提高了系统的响应速度和整体吞吐量。 CAS(Central Authentication Service)是耶鲁大学开源的单点登录(Single Sign-On)系统,用于统一管理...

    58到家分布式服务框架.zip

    通过消息队列,服务可以异步地处理请求,提高系统的响应速度和吞吐量。同时,它也能缓冲高峰期的请求,避免系统因瞬间高流量而崩溃。 七、监控与日志 有效的监控和日志记录是保障系统健康运行的重要环节。58到家...

Global site tag (gtag.js) - Google Analytics