基于netty的rpc实现过程:
消费者
实现代理接口,在组装好数据之后实例化成代理
设置好消费者的netty处理管道过滤器---尤其是异步处理
服务提供者
发布服务
设置服务提供方的netty管道过滤处理器---接受接口参数,结果处理器
消费者实例化的时之后,基于次消费者获取服务对象的代理对象,调用了服务代理的方法之后就会走invoke方法,调用封装好的消费端的netty--send将调用的服务,信息,参数信息传过去
服务提供端发布服务接口信息到一个内存(类的成员变量)中,通过netty管道中自定义的过滤器,接收消息,利用反射实例化处接口的对象,调用这个接口对象的方法
消费端和服务端交互过程:
客户端的读在自定义的管道处理中(读服务方返回的结果),写在发送请求的方法(代理中)中(写请求的参数)
服务端读(读取客户端的请求参数),调用复写的handle方法反射出服务实例调用,写(写入返回结果)都在自定义的管道处理器中
rpc的异步原理:
futuer结合thradlocal
future future在最终get的时候相当于同步,在发出请求此时是异步(主线程可继续往下),最终的get还是异步,
用future时最终的同步
例如
Object result=future.getResult(timeout, TimeUnit.MILLISECONDS);
还有一种思路就是结合threadlocal类,请求的时候直接发出,然后返回null,然后在需要的时候(主线程没有停止)通过这个线程从threadlocal中获取结果集
例如:
ResponseFuture
一个管道必加的处理器,编码,解码,读写处理器---进行消息的发送和接收
发布的时候根据端口(服务端不必写ip),监听端口的请求
请求连接需要ip+端口
发送 接收就是操作管道 通过消费管道的自定义过滤器写 继承ChannelInboundHandlerAdapter (实例化接收服务的时候连接,连接的时候就建立了管道)
连接获取管道;
public RpcConsumerImpl()
{
String ip=System.getProperty("SIP");
//String ip="127.0.0.1";
this.asyncMethods=new HashMap<String,ResponseCallbackListener>();
this.connection=new RpcNettyConnection(ip,8888);
this.connection.connect();
connection_list=new ArrayList<RpcConnection>();
int num=Runtime.getRuntime().availableProcessors()/3 -2;
for (int i = 0; i < num; i++) {
connection_list.add(new RpcNettyConnection(ip, 8888));
}
for (RpcConnection conn:connection_list)
{
conn.connect();
}
}
获取管道发送:
public Object Send(RpcRequest request,boolean async) {
if(channel==null)
channel=getChannel(inetAddr.toString());
if(channel!=null)
{
final InvokeFuture<Object> future=new InvokeFuture<Object>();
futrues.put(request.getRequestId(), future);
future.setMethod(request.getMethodName());
ChannelFuture cfuture=channel.writeAndFlush(request);
cfuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture rfuture) throws Exception {
if(!rfuture.isSuccess()){
future.setCause(rfuture.cause());
}
}
});
resultFuture=new ResultFuture<Object>(timeout);
resultFuture.setRequestId(request.getRequestId());
try
{
if(async)//异步执行的话直接返回
{
ResponseFuture.setFuture(resultFuture);
return null;
}
Object result=future.getResult(timeout, TimeUnit.MILLISECONDS);
return result;
}
catch(RuntimeException e)
{
throw e;
}
finally
{
//这个结果已经收到
if(!async)
futrues.remove(request.getRequestId());
}
}
else
{
return null;
}
}
接收端:
通过接收管道的自定义过滤器,复写监听读 继承ChannelInboundHandlerAdapter
request 实现了序列化
发布的时候根据端口(服务端不必写ip),监听端口的请求
发布接受连接请求:
@Override
public void publish() {
handlerMap.put(interfaceclazz.getName(), classimplement);
// TODO Auto-generated method stub
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// server端采用简洁的连写方式,client端才用分段普通写法。
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RpcEncoder(RpcResponse.class));
ch.pipeline().addLast(new RpcDecoder(RpcRequest.class));
// ch.pipeline().addLast(new FSTNettyEncode());
// ch.pipeline().addLast(new FSTNettyDecode());
ch.pipeline().addLast(new RpcRequestHandler(handlerMap));
}
})
.option(ChannelOption.SO_KEEPALIVE , true )
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_SNDBUF, 1024)
.option(ChannelOption.SO_RCVBUF, 2048);
ChannelFuture f = serverBootstrap.bind(8888).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
读取发送端的参数:
@Override
public void channelRead(
ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest request=(RpcRequest)msg;
String host=ctx.channel().remoteAddress().toString();
//更新上下文
UpdateRpcContext(host,request.getContext());
//TODO 获取接口名 函数名 参数 找到实现类 反射实现
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try
{
Object result = handle(request);
if(cacheName!=null&&cacheName.equals(result))
{
response.setAppResponse(cacheVaule);
}
else
{
response.setAppResponse(ByteObjConverter.ObjectToByte(result));
cacheName=result;
cacheVaule=ByteObjConverter.ObjectToByte(result);
}
}
catch (Throwable t)
{
//response.setErrorMsg(t);
response.setExption(Tool.serialize(t));
response.setClazz(t.getClass());
}
ctx.writeAndFlush(response);
}
参考:
https://blog.csdn.net/zhujunxxxxx/article/details/48742529
https://github.com/zhujunxxxxx/
相关推荐
QiuRPC是基于Java实现的一个轻量级RPC框架,其设计目标是简单易用,易于理解和扩展。下面将从几个关键组件来分析QiuRPC的实现原理: 1. **服务接口与实现**: - QiuRPC允许开发者定义服务接口,然后在服务提供者侧...
RPC(Remote Procedure Call)是一种计算机通信协议,它允许程序在一台计算机上执行...以上是对“nfs-rpc”高性能RPC框架的一些可能实现和特性分析,具体的功能和实现细节需要参考该框架的文档或源代码进行深入学习。
综上所述,实现自定义RPC框架是一项涉及多方面技术的任务,包括网络编程、序列化、服务注册与发现、协议设计等。通过学习和实践,你可以更好地理解RPC的工作原理,以及Netty在构建高性能网络应用中的强大能力。
《SOFARPC框架v5.12.0深度解析》 SOFARPC,全称为Simplified Open For阿里的Remote Procedure Call,是阿里巴巴开源的一款高性能、轻量级的RPC框架,广泛应用于分布式系统中。SOFARPC v5.12.0作为其重要的版本迭代...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用!...基于 Connect UDP 和自定义网络协议(简单请求响应协议) 、Reactor 网络模型(one loop per thread + 线程池) 的轻量RPC框架(源码+项目说明).zip
文章进一步分析了在ONC RPC框架中影响数据共享性能的因素,并通过实验探究了不同参数设置下的ONC RPC性能表现。这些实验结果为框架的使用者提供了重要的指导信息,帮助他们在使用RPC-DDSF框架时能够调整参数以达到...
基于 Connect UDP 和自定义网络协议、Reactor 网络模型的轻量RPC框架,是一个集成了高效 I/O 多路复用机制的项目。该框架利用 Reactor 模式处理并发 I/O,通过一个主循环监听多个文件描述符,当事件发生时调用相应的...
1、该资源内项目代码经过严格调试,下载即用确保可以运行!...基于 Connect UDP 和自定义网络协议(简单请求响应协议) 、Reactor 网络模型(one loop per thread + 线程池) 的轻量RPC框架(源码+项目说明).zip
【作品名称】:基于Java实现的基于netty轻量的高性能分布式RPC服务框架 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】...
在这个“netty的rpc协议的简单实现”项目中,我们将探讨Netty如何被用来构建RPC框架的关键组件和流程。 首先,我们需要理解RPC的基本原理。RPC的核心是封装网络通信细节,使得客户端和服务器之间通过接口进行通信,...
使用Java的虚拟线程和Netty手写一个基于自定义协议的RPC(远程过程调用)框架,框架说明请参考文章:https://blog.csdn.net/weixin_47157828/article/details/145034344
了解Trick-Rpc-Framework的实现可以帮助我们更好地理解RPC的工作原理,同时也可以为我们自定义RPC框架或优化现有框架提供参考。在实际使用中,可以根据具体需求扩展其功能,例如添加负载均衡、服务治理等特性。 ...
Phprpc框架由服务端和服务端客户端两部分组成,通过高效的序列化和反序列化机制,实现了跨进程的数据交换。它支持多种协议,如HTTP、TCP、UDP等,可以根据项目需求选择合适的通信方式。 3. **安装与配置** 安装...
【描述】提到了“tiny-rpc”项目,它是一个轻量级的RPC框架,旨在提供与Dubbo类似的功能,但更注重源码级别的理解和实现。通过深入研读Dubbo的源代码,开发者能够理解RPC背后的基本原理,包括服务发现、负载均衡、...
本项目是一个轻量级RPC框架,也是对SPI的一次实践,通过SPI可自定义扩展通信模块、服务管理中心、负载均衡算法。 对于这三个部分,本项目分别提供的默认实现是Netty、ZooKeeper、random算法;并在demo模块里面做了...
Simple Registry:实现 AbstractRegistry 接口,参考例子只是简单实现,不支持集群,可作为自定义注册中心的参考,但不适合直接用于生产环境。 Dubbo 的通信协议 1. Dubbo – 基于 TCP 的 NIO 异步传输、单连接...
在RPC框架中,服务提供者(Server)会暴露一组可调用的函数或方法,而服务消费者(Client)则可以通过网络调用这些远程服务,仿佛它们就在本地运行。Phprpc_3.0.1_Delphi 提供了高效的序列化和反序列化机制,确保...
描述中的"JSON-RPC for Java.src 2.0"再次强调了这是一个Java语言实现的JSON-RPC框架,版本号为2.0。这通常意味着它可能包含了一些新功能、改进和修复的错误,相比之前的版本更加稳定和成熟。 标签同样反映了这个...
在这个主题中,“修改phprpc源码以支持集合类的string类型的转换”涉及到对Phprpc框架的源代码进行定制化改造,以适应处理集合类中的字符串类型转换...这个过程不仅锻炼了编程技能,也深入理解了RPC框架的工作机制。