`

手写RPC框架

    博客分类:
  • RPC
rpc 
阅读更多

 

源码地址:https://github.com/ItGeneral/code-framework

微信公众号原文:https://mp.weixin.qq.com/s/z1kIqvlX92oun1Cpm71Tsw

 

RPC(Remote Procedure Call Protocol)远程过程调用协议,采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。使用RPC框架,可以让远程接口调用像调用本地方法一样方便。 下面介绍一个简单的RPC框架,让读者领略RPC框架的原理

RPC服务端

(1)新增注解@RpcExporter,接口上添加该注解,即表示为RPC服务端接口

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RpcExporter {
  5. /**
  6. * RPC服务端处理器名称(接口名称)
  7. * @return
  8. */
  9. String value();
  10. }

(2)新增接口RpcTestService及其实现RpcTestServiceImpl

  1. public interface RpcTestService {
  2. Boolean testRpcService(String params);
  3. }
  4. @Service
  5. public class RpcTestServiceImpl implements RpcTestService{
  6. private final static Logger LOGGER = getLogger(RpcTestServiceImpl.class);
  7. @Override
  8. @RpcExporter(value = "testRpcService")
  9. public Boolean testRpcService(String params){
  10. LOGGER.info("======testRpcService====success====" + params);
  11. return true;
  12. }
  13. }

RPC客户端

(1)新增注解@RpcClient,接口方法上添加该注解,即表示为RPC客户端接口

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RpcClient {
  5. /**
  6. * 对应的服务端处理器名称(bean名称+接口名称)
  7. * @return
  8. */
  9. String value();
  10. /**
  11. * rpc 服务端地址
  12. * @return
  13. */
  14. String rpcExportUrl();
  15. }

(2)添加@RpcInterface注解,接口类上添加此注解,表明为RPC客户端接口类

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RpcInterface {
  5. }

(3)新增RPC客户端接口

  1. @Service
  2. @RpcInterface
  3. public interface RpcTestClient {
  4. @RpcClient(value = "RpcTestServiceImpl.testRpcService", rpcExportUrl = "http://localhost:8080/rpc")
  5. Boolean testClient(String params);
  6. }

初始化RPC框架

(1)添加@EnableRPC注解,开启RPC配置

  1. @Documented
  2. @Configuration
  3. @Target(ElementType.TYPE)
  4. @Retention(RetentionPolicy.RUNTIME)
  5. @Import({RpcConfiguration.RpcPackageRegister.class, RpcConfiguration.class})
  6. public @interface EnableRPC {
  7. /**
  8. * RPC 扫描包路径
  9. * @return
  10. */
  11. String[] basePackages() default {};
  12. }
  13. public class RpcConfiguration {
  14. private static Set<String> basePackages = new LinkedHashSet<>();
  15. /**
  16. * basePackages路径下的rpc客户端接口,设置为动态代理
  17. * @return
  18. */
  19. @Bean
  20. public RpcRegister rpcRegister(){
  21. return new RpcRegister(basePackages);
  22. }
  23. public static class RpcPackageRegister implements ImportBeanDefinitionRegistrar{
  24. /**
  25. * 解析EnableRPC basePackages参数
  26. * @param metadata
  27. * @param registry
  28. */
  29. @Override
  30. public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
  31. Map<String, Object> enableMap = metadata.getAnnotationAttributes(EnableRPC.class.getName());
  32. AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(enableMap);
  33. String[] packages = annotationAttributes.getStringArray("basePackages");
  34. for (String basePackage : packages){
  35. String[] tokenize = StringUtils.tokenizeToStringArray(basePackage,
  36. ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
  37. basePackages.addAll(Arrays.asList(tokenize));
  38. }
  39. if (basePackages.isEmpty()) {
  40. basePackages.add(ClassUtils.getPackageName(metadata.getClassName()));
  41. }
  42. }
  43. }
  44. }

(2)spring bean 初始化前为所有RPC客户端接口设置动态代理,自定义invoke方法实现RPC客户端向RPC服务端请求数据的逻辑

  1. public class RpcRegister implements BeanFactoryPostProcessor {
  2. /**
  3. * rpc扫描包
  4. */
  5. private Set<String> baseScanPackages;
  6. public RpcRegister(Set<String> baseScanPackages) {
  7. this.baseScanPackages = baseScanPackages;
  8. }
  9. /**
  10. * spring容器在初始化bean之前,允许自定义某些bean
  11. * RPC客户端所有bean设置动态代理,调用时走动态代理的invoke()
  12. * @param beanFactory
  13. * @throws BeansException
  14. */
  15. @Override
  16. public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
  17. DefaultListableBeanFactory factory = (DefaultListableBeanFactory) beanFactory;
  18. for (String basePackage : baseScanPackages){
  19. try {
  20. Set<Class<?>> classSet = scanBackPackage(basePackage);
  21. for (Class<?> clazz : classSet){
  22. if (clazz.isInterface() && clazz.isAnnotationPresent(RpcInterface.class)){
  23. //生成动态代理
  24. Object proxy = RpcClientProxy.getProxy(clazz);
  25. //将bean注入到spring容器中
  26. factory.registerSingleton(clazz.getName(), proxy);
  27. }
  28. }
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. /**
  35. * 扫描包下面的所有类
  36. * @param basePackage
  37. * @return
  38. */
  39. private Set<Class<?>> scanBackPackage(String basePackage){
  40. Set<Class<?>> classSet = new HashSet<>();
  41. try{
  42. String pattern = "classpath*:" + ClassUtils.convertClassNameToResourcePath(basePackage) + "/**/*.class";
  43. ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
  44. Resource[] resources = resourcePatternResolver.getResources(pattern);
  45. MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourcePatternResolver);
  46. for (Resource resource : resources){
  47. if (resource.isReadable()){
  48. MetadataReader reader = readerFactory.getMetadataReader(resource);
  49. String className = reader.getClassMetadata().getClassName();
  50. classSet.add(Class.forName(className, false, Thread.currentThread().getContextClassLoader()));
  51. }
  52. }
  53. }catch (Exception e){
  54. e.printStackTrace();
  55. }
  56. return classSet;
  57. }
  58. }

(3)在spring bean初始化后,注册RPC服务端和客户端接口,将RPC服务端相关的配置、bean、接口名称等保存到内存中,以便后续解析调用接口使用

  1. /**
  2. * 将rpc客户端和服务端的注解配置信息 保存到RpcFactory中
  3. * @param bean
  4. * @param beanName
  5. */
  6. public void resolveRpc(Object bean, String beanName){
  7. Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
  8. if (methods == null){
  9. return;
  10. }
  11. for (Method method : methods){
  12. //解析RPC服务端接口注解的配置
  13. RpcExporter rpcExporter = AnnotationUtils.findAnnotation(method, RpcExporter.class);
  14. if (rpcExporter != null){
  15. RpcExporterContext exporterContext = new RpcExporterContext();
  16. exporterContext.setBean(bean);
  17. exporterContext.setMethod(method);
  18. RpcFactory.registerExporter(beanName, method.getName(), exporterContext);
  19. }
  20. //解析RPC客户端接口注解的配置
  21. RpcClient rpcClient = AnnotationUtils.findAnnotation(method, RpcClient.class);
  22. if (rpcClient != null){
  23. RpcClientContext clientContext = new RpcClientContext();
  24. clientContext.setUrl(rpcClient.rpcExportUrl());
  25. clientContext.setMethodName(rpcClient.value());
  26. RpcFactory.registerClient(beanName, method.getName(), clientContext);
  27. }
  28. }
  29. }

(4)添加Rpc拦截器,所有rpc的请求,都通过该拦截器来执行

  1. @Configuration
  2. public class WebConfiguration extends WebMvcConfigurationSupport {
  3. /**
  4. * 配置rpc过滤器
  5. * @return
  6. */
  7. @Bean
  8. public FilterRegistrationBean rpcFilter() {
  9. FilterRegistrationBean registration = new FilterRegistrationBean();
  10. registration.setFilter(new RpcFilter());
  11. registration.addUrlPatterns("/rpc");
  12. return registration;
  13. }
  14. }
  15. public class RpcFilter implements Filter {
  16. /**
  17. * 1、RPC服务端监听到特定rpc地址时,doFilter解析客户端的bean名称、接口、以及接口参数
  18. * 2、调用相应的bean接口,并得到返回结果写回response
  19. * @param req
  20. * @param resp
  21. * @param chain
  22. * @throws IOException
  23. * @throws ServletException
  24. */
  25. @Override
  26. public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException {
  27. HttpServletRequest request = (HttpServletRequest) req;
  28. HttpServletResponse response = (HttpServletResponse) resp;
  29. InputStreamReader inputStreamReader = null;
  30. BufferedReader reader = null;
  31. try{
  32. inputStreamReader = new InputStreamReader(request.getInputStream(), "UTF-8");
  33. reader = new BufferedReader(inputStreamReader);
  34. String requestBody = "";
  35. String temp ;
  36. while ((temp = reader.readLine()) != null) {
  37. requestBody += temp;
  38. }
  39. //获取请求参数
  40. RpcRequest rpcRequest = JSONObject.parseObject(requestBody, RpcRequest.class);
  41. if (StringUtils.isEmpty(rpcRequest.getMethod())){
  42. throw new RuntimeException("rpc method parameter can not be null");
  43. }
  44. //获取当前rpc请求的服务端配置信息:bean和method
  45. RpcExporterContext exporterContext = RpcFactory.getExporterMap(rpcRequest.getMethod());
  46. //通过反射调用rpc服务端具体的接口方法
  47. Object result = exporterContext.getMethod().invoke(((Class) exporterContext.getBean()).newInstance(), rpcRequest.getParams().toArray());
  48. RpcResponse rpcResponse = new RpcResponse();
  49. rpcResponse.setResult(result);
  50. //写回执行结果
  51. response.getWriter().write(JSON.toJSONString(rpcResponse));
  52. }catch (Exception e){
  53. e.printStackTrace();
  54. }finally {
  55. if (reader != null){
  56. reader.close();
  57. }
  58. if (inputStreamReader != null){
  59. inputStreamReader.close();
  60. }
  61. }
  62. }
  63. }

RPC调用链

(1)业务方调用RPC客户端接口,通过动态代理后,进入invoke方法,通过HttpClient或其他方式向RPC服务端发起请求

(2)服务端添加RPC过滤器,所有通过某个特定地址(http://xxx/rpc)来的请求,都通过这个RPC拦截器

(3)RPC拦截器中解析客户端的请求参数,包括bean名称、接口名称、接口参数等

(4)通过解析出来的请求参数,调用RPC服务端相应的接口方法,并获取到返回结果

(5)RPC服务端将请求结果返回给RPC客户端,RPC客户端接受到返回结果后,继续处理业务逻辑

添加Controller类,请求http://localhost:8080/test测试,结果返回true

  1. @RestController
  2. public class RpcController {
  3. @Autowired
  4. private RpcTestClient client;
  5. @RequestMapping(value = "/test")
  6. public Boolean test() throws NoSuchMethodException {
  7. RpcExporterContext exporterContext = new RpcExporterContext();
  8. try{
  9. Class clazz = Class.forName("com.code.framework.rpc.exporter.RpcTestServiceImpl");
  10. Method method = clazz.getDeclaredMethod("testRpcService", String.class);
  11. exporterContext.setBean(clazz);
  12. exporterContext.setMethod(method);
  13. }catch (Exception e){
  14. e.printStackTrace();
  15. }
  16. RpcFactory.registerExporter("RpcTestServiceImpl", "testRpcService", exporterContext);
  17. RpcClientContext clientContext = new RpcClientContext();
  18. //设置rpc服务端请求地址
  19. clientContext.setUrl("http://localhost:8080/rpc");
  20. //设置rpc服务端接口
  21. clientContext.setMethodName("RpcTestServiceImpl.testRpcService");
  22. RpcFactory.registerClient("RpcTestClient", "testClient", clientContext);
  23. //上述代码为手动配置rpc客户端和服务端相应的配置信息,也可以通过扫描包中rpc客户端和服务端接口方法上的注解来获取配置信息,然后将其保存到RpcFactory中,供解析调用时使用
  24. Boolean result = client.testClient("test rpc");
  25. return result;
  26. }
  27. }

 

0
0
分享到:
评论

相关推荐

    Netty4.1实战-手写RPC框架.pdf

    RPC是一种远程调用的通信协议,例如dubbo、thrift等,我们在互联网高并发应用开发时候都会使用到类似的服务。本专题主要通过三个章节实现一个rpc通信的基础功能,来学习RPC服务...- 手写RPC框架第三章《RPC中间件》

    基于netty的手写rpc框架

    在本项目中,我们将基于Netty实现一个手写的RPC框架。Netty是Java领域的一个高性能、异步事件驱动的网络应用程序框架,常用于构建高效的服务器和客户端。 首先,我们需要理解RPC框架的基本组成部分: 1. **服务...

    java 手写rpc框架 rpc-server and rpc-client

    在这个Java手写RPC框架的项目中,我们主要关注两个核心部分:`rpc-server`和`rpc-client`。 ### 1. RPC服务器(rpc-server) `rpc-server`是RPC框架的提供者端,它承载了服务的实现并对外发布。以下是一些关键知识...

    基于Netty4核心原理的Java手写RPC框架设计源码

    本项目为《Netty4核心原理与手写RPC框架实战》一书的配套代码示例工程,涵盖223个文件,主要包括63个Java源文件、130个GIF图片、7个XML配置文件、4个PNG图片、4个JPG图片、3个Shell脚本、3个JavaScript文件、2个属性...

    手写RPC框架V1.zip

    本资源“手写RPC框架V1.zip”提供了作者手动实现的一个RPC框架的源代码,包括`rpc-server`和`rpc-client`两个部分,便于学习者深入理解RPC的工作原理。 ### RPC框架概述 RPC框架的核心目标是简化分布式系统间的通信...

    手写RPC框架1

    本文将探讨手写RPC框架的一些核心概念和组件。 首先,RPC架构的核心是网络传输。在实现RPC时,我们需要设计一个能够发送网络请求的机制,将目标类、方法信息以及参数从客户端传输到服务端。常见的网络传输库有BIO...

    手写RPC框架,说明:https://blog.csdn.net/lan861698789/article/details/103837228

    本文将深入探讨手写RPC框架的核心概念和技术要点,并结合Netty作为网络通信库来实现。 首先,理解RPC框架的基本流程至关重要。当客户端发起一个RPC调用时,它会创建一个请求对象,包含调用的方法名、参数等信息,...

    手写RPC框架两个工程文件.zip

    总的来说,手写RPC框架的实践有助于深化对分布式系统、网络通信以及服务治理的理解。通过对`rpcServer`和`rpcClient`的分析,可以学习到如何构建一个基本的、可工作的RPC框架,这对于提升自己的系统设计和编程能力...

    手写RPC框架Feign

    在`monkey-feign`这个项目中,可能包含了Feign的基本示例和自定义配置,可以用来学习和实践如何使用Feign构建RPC框架。通过阅读源代码,我们可以深入了解Feign的工作原理,以及如何在实际项目中应用和扩展它。同时,...

    手写RPC框架代码(带注释)

    框架是Netty,代码主要分为 provider registry protocol和consumer等。 实现本地调用LPC和远程调用RPC,对比了二者的速度。 RPC部分代码参考书籍:《Netty4核心原理与手写rpc实战》

    基于Java的MyRPCFromZero手写RPC框架设计与实现源码解析

    该项目为基于Java语言的MyRPCFromZero RPC框架,精心设计了344个文件,涵盖179个类文件、153个Java源文件、6个XML配置文件、2个属性文件,以及必要的Git忽略和LICENSE文件等。旨在从零开始,逐步解析并实现一个易于...

    手写rpc的项目

    本文将详细介绍手写RPC框架的基本原理,并提供相关知识点。 首先,了解RPC的基本概念。RPC使得客户端可以调用服务器上的方法,而无需知道底层网络协议或细节。这个过程通常包括以下步骤: 1. **序列化与反序列化**...

    手写RPC框架实例,配合《IO流基础(二)网络IO》食用更佳

    本实例将带你深入理解RPC框架的实现,结合《IO流基础(二)网络IO》的知识,我们将探讨如何通过网络进行数据交换。 网络IO是任何涉及网络通信的程序的基础,包括RPC框架。在Java中,我们通常使用Socket API来处理...

    Netty核心原理剖析与RPC实践pd手抄

    Netty核心原理剖析与RPC实践手抄版本,基本复刻了全部内容,如有丢失请私聊

    Java使用传统socket手写的远程RPC框架调用框架

    通过以上介绍,我们可以看到这个基于Java Socket的手写RPC框架是如何利用核心概念实现远程服务调用的。它简化了分布式系统间的通信,提高了开发效率。然而,实际生产环境中,为了提高性能、稳定性和可扩展性,我们...

    基于Netty4核心原理的RPC框架设计源码工程

    该项目是《Netty4核心原理与手写RPC框架实战》一书的配套代码示例工程,由223个文件构成,其中Java源代码63个,GIF图像130个,XML配置7个,PNG图片4个,JPG图片4个,Shell脚本3个,JavaScript和CSS各3个,批处理文件...

    手写rpc rpc简单源码 rpc源码学习 rpc过程了解 rpc通信原理

    下面将详细解释RPC的核心概念、工作原理以及手写RPC的基本流程。 一、RPC核心概念 1. **客户端(Client)**:发起RPC调用的一方,它通常需要知道服务接口和方法,但不需要关心服务是如何实现的。 2. **服务端...

    .arch手写RPC.pdf

    总的来说,手写RPC框架涉及的关键步骤包括: 1. 工程结构的搭建和依赖引入。 2. 编写引导类,初始化和运行服务。 3. 实现服务注册接口,例如基于Zookeeper的服务注册。 4. 使用注解来标记服务提供者类和接口。 5. ...

Global site tag (gtag.js) - Google Analytics