注册中心代码使用 zookeeper 实现,我们通过图片来看看我们注册中心的架构。
首先说明, zookeeper 的实现思路和代码是参考架构探险这本书上的,另外在 github 和我前面配置文件中的 zookeeper 服务器是用的1个月免费适用的阿里云,大家也可以用它当测试用。
不多说,一次性给出注册中心全部代码。
客户端对应的注册中心接口
public interface RegisterCenter4Consumer {
/**
* 消费端初始化服务提供者信息本地缓存
*/
public void initProviderMap();
/**
* 消费端获取服务提供者信息
* @return
*/
public Map<String,List<ServiceProvider>> getServiceMetaDataMap4Consumer();
/**
* 消费端将消费者信息注册到 zookeeper 对应的节点下
* @param invokers
*/
public void registerConsumer(final List<ServiceConsumer> invokers);
}
服务端对应的注册中心接口
public interface RegisterCenter4Provider {
/**
* 服务端将服务提供者信息注册到 zookeeper 对应的节点下
* @param serivceList
*/
public void registerProvider(final List<ServiceProvider> serivceList);
/**
* 服务端获取服务提供者信息
* @return key:服务提供者接口 value:服务提供者服务方法列表
*/
public Map<String, List<ServiceProvider>> getProviderService();
}
注册中心实现类:
public class ZookeeperRegisterCenter implements RegisterCenter4Provider, RegisterCenter4Consumer {
private static ZookeeperRegisterCenter registerCenter = new ZookeeperRegisterCenter();
private ZookeeperRegisterCenter(){};
public static ZookeeperRegisterCenter getInstance(){
return registerCenter;
}
//服务提供者列表,key:服务提供者接口,value:服务提供者服务方法列表
private static final Map<String,List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>();
//服务端 zookeeper 元信息,选择服务(第一次从zookeeper 拉取,后续由zookeeper监听机制主动更新)
private static final Map<String,List<ServiceProvider>> serviceData4Consumer = new ConcurrentHashMap<>();
//从配置文件中获取 zookeeper 服务地址列表
private static String ZK_SERIVCE = Configuration.getInstance().getAddress();
//从配置文件中获取 zookeeper 会话超时时间配置
private static int ZK_SESSION_TIME_OUT = 5000;
//从配置文件中获取 zookeeper 连接超时事件配置
private static int ZK_CONNECTION_TIME_OUT = 5000;
private static String ROOT_PATH = "/rpc_register";
public static String PROVIDER_TYPE = "/provider";
public static String CONSUMER_TYPE = "/consumer";
private static volatile ZkClient zkClient = null;
@Override
public void initProviderMap() {
if(serviceData4Consumer.isEmpty()){
serviceData4Consumer.putAll(fetchOrUpdateServiceMetaData());
}
}
@Override
public Map<String, List<ServiceProvider>> getServiceMetaDataMap4Consumer() {
return serviceData4Consumer;
}
@Override
public void registerConsumer(List<ServiceConsumer> consumers) {
if(consumers == null || consumers.size() == 0){
return;
}
//连接 zookeeper ,注册服务
synchronized (ZookeeperRegisterCenter.class){
if(zkClient == null){
zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
}
//创建 zookeeper 命名空间
boolean exist = zkClient.exists(ROOT_PATH);
if(!exist){
zkClient.createPersistent(ROOT_PATH,true);
}
//创建服务提供者节点
exist = zkClient.exists((ROOT_PATH));
if(!exist){
zkClient.createPersistent(ROOT_PATH);
}
for(int i = 0; i< consumers.size();i++) {
ServiceConsumer consumer = consumers.get(i);
//创建服务消费者节点
String serviceNode = consumer.getConsumer().getName();
String servicePath = ROOT_PATH + CONSUMER_TYPE + "/" + serviceNode;
exist = zkClient.exists(servicePath);
System.out.println("exist:" + exist);
System.out.println("servicePath:" + servicePath);
if (!exist) {
zkClient.createPersistent(servicePath, true);
}
//创建当前服务器节点
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
String ip = addr.getHostAddress();
String currentServiceIpNode = servicePath + "/" + ip;
exist = zkClient.exists(currentServiceIpNode);
if (!exist) {
zkClient.createEphemeral(currentServiceIpNode);
}
}
}
}
@Override
public void registerProvider(List<ServiceProvider> serivceList) {
if(serivceList == null || serivceList.size() == 0){
return;
}
//连接 zookeeper,注册服务,加锁,将所有需要注册的服务放到providerServiceMap里面
synchronized (ZookeeperRegisterCenter.class){
for(ServiceProvider provider:serivceList){
//获取接口名称
String serviceItfKey = provider.getProvider().getName();
//先从当前服务提供者的集合里面获取
List<ServiceProvider> providers = providerServiceMap.get(serviceItfKey);
if(providers == null){
providers = new ArrayList<>();
}
providers.add(provider);
providerServiceMap.put(serviceItfKey,providers);
}
if(zkClient == null){
zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT,new SerializableSerializer());
}
//创建当前应用 zookeeper 命名空间
boolean exist = zkClient.exists(ROOT_PATH);
if(!exist){
zkClient.createPersistent(ROOT_PATH,true);
}
//服务提供者节点
exist = zkClient.exists((ROOT_PATH));
if(!exist){
zkClient.createPersistent(ROOT_PATH);
}
for(Map.Entry<String,List<ServiceProvider>> entry:providerServiceMap.entrySet()){
//创建服务提供者节点
String serviceNode = entry.getKey();
String servicePath = ROOT_PATH +PROVIDER_TYPE +"/" + serviceNode;
exist = zkClient.exists(servicePath);
if(!exist){
zkClient.createPersistent(servicePath,true);
}
//创建当前服务器节点,这里是注册时使用,一个接口对应的ServiceProvider 只有一个
int serverPort = entry.getValue().get(0).getPort();
InetAddress addr = null;
try {
addr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
String ip = addr.getHostAddress();
String impl = (String)entry.getValue().get(0).getServiceObject();
String serviceIpNode = servicePath +"/" + ip + "|" + serverPort + "|" + impl;
System.out.println("serviceIpNode:" + serviceIpNode);
exist = zkClient.exists(serviceIpNode);
if(!exist){
//创建临时节点
zkClient.createEphemeral(serviceIpNode);
}
//监听注册服务的变化,同时更新数据到本地缓存
zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
if(list == null){
list = new ArrayList<>();
}
//存活的服务 IP 列表
List<String> activeServiceIpList = new ArrayList<>();
for(String input:list){
String ip = StringUtils.split(input, "|").get(0);
activeServiceIpList.add(ip);
}
refreshActivityService(activeServiceIpList);
}
});
}
}
}
/**
*
* 在某个服务端获取自己暴露的服务
*/
@Override
public Map<String, List<ServiceProvider>> getProviderService() {
return providerServiceMap;
}
//利用ZK自动刷新当前存活的服务提供者列表数据
private void refreshActivityService(List<String> serviceIpList) {
if (serviceIpList == null||serviceIpList.isEmpty()) {
serviceIpList = new ArrayList<>();
}
Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>();
for (Map.Entry<String, List<ServiceProvider>> entry : providerServiceMap.entrySet()) {
String key = entry.getKey();
List<ServiceProvider> providerServices = entry.getValue();
List<ServiceProvider> serviceMetaDataModelList = currentServiceMetaDataMap.get(key);
if (serviceMetaDataModelList == null) {
serviceMetaDataModelList = new ArrayList<>();
}
for (ServiceProvider serviceMetaData : providerServices) {
if (serviceIpList.contains(serviceMetaData.getIp())) {
serviceMetaDataModelList.add(serviceMetaData);
}
}
currentServiceMetaDataMap.put(key, serviceMetaDataModelList);
}
providerServiceMap.clear();
providerServiceMap.putAll(currentServiceMetaDataMap);
}
private void refreshServiceMetaDataMap(List<String> serviceIpList) {
if (serviceIpList == null) {
serviceIpList = new ArrayList<>();
}
Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>();
for (Map.Entry<String, List<ServiceProvider>> entry : serviceData4Consumer.entrySet()) {
String serviceItfKey = entry.getKey();
List<ServiceProvider> serviceList = entry.getValue();
List<ServiceProvider> providerServiceList = currentServiceMetaDataMap.get(serviceItfKey);
if (providerServiceList == null) {
providerServiceList = new ArrayList<>();
}
for (ServiceProvider serviceMetaData : serviceList) {
if (serviceIpList.contains(serviceMetaData.getIp())) {
providerServiceList.add(serviceMetaData);
}
}
currentServiceMetaDataMap.put(serviceItfKey, providerServiceList);
}
serviceData4Consumer.clear();
serviceData4Consumer.putAll(currentServiceMetaDataMap);
}
private Map<String, List<ServiceProvider>> fetchOrUpdateServiceMetaData() {
final Map<String, List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>();
//连接zk
synchronized (ZookeeperRegisterCenter.class) {
if (zkClient == null) {
zkClient = new ZkClient(ZK_SERIVCE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
}
}
//从ZK获取服务提供者列表
String providePath = ROOT_PATH+PROVIDER_TYPE;
System.out.println("111111:"+providePath);
List<String> providerServices = zkClient.getChildren(providePath);
System.out.println(providerServices.toString());
for (String serviceName : providerServices) {
String servicePath = providePath +"/"+ serviceName;
System.out.println("1100:"+servicePath);
List<String> ipPathList = zkClient.getChildren(servicePath);
System.out.println("ipPathList:"+ipPathList.toString());
for (String ipPath : ipPathList) {
String serverIp = ipPath.split("\\|")[0];
String serverPort = ipPath.split("\\|")[1];
String impl = ipPath.split("\\|")[2];
List<ServiceProvider> providerServiceList = providerServiceMap.get(serviceName);
if (providerServiceList == null) {
providerServiceList = new ArrayList<>();
}
ServiceProvider providerService = new ServiceProvider();
try {
Class clazz = Class.forName(serviceName);
providerService.setProvider(clazz);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
providerService.setIp(serverIp);
providerService.setPort(Integer.parseInt(serverPort));
providerService.setServiceObject(impl);
providerService.setGroupName("");
providerServiceList.add(providerService);
providerServiceMap.put(serviceName, providerServiceList);
}
//监听注册服务的变化,同时更新数据到本地缓存
zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
if (currentChilds == null) {
currentChilds = new ArrayList<>();
}
List<String> activeServiceIpList = new ArrayList<>();
for(String input:currentChilds){
String ip = StringUtils.split(input, "|").get(0);
activeServiceIpList.add(ip);
}
refreshServiceMetaDataMap(activeServiceIpList);
}
});
}
return providerServiceMap;
}
}
写完这部分整个 rpc 框架也就实现了,测试的客户端和服务端在代码里也有,这里就不贴出来了。平时时间有限,只能下班和周末的时间来写,整个框架肯定有不足和错误的地方,也有可以改进的地方。希望大家能够不吝指教,互相进步。
我只是想将自己思考的过程给大家展示出来,希望大家一起讨论这些问题,看看还有哪些能够改进的地方。
需要改进的地方:
- 服务端的启动方式。
- 更高并发的改进。
- 服务治理。
- 监测中心。
如果您觉得对你的编程或多或少有点启发就点个赞。
最后给出源码 github 地址,源码 编码和码字不易,如果您觉得学到了东西就请在 github 上加个 star, 当然在 github 上提出问题一起改进是最好的。
相关推荐
RPC是一种远程调用的通信协议,例如dubbo、thrift等,我们在互联网高并发应用开发时候都会使用到类似的服务。本专题主要通过三个章节实现一个rpc通信的基础功能,来学习RPC服务...- 手写RPC框架第三章《RPC中间件》
在本项目中,我们将基于Netty实现一个手写的RPC框架。Netty是Java领域的一个高性能、异步事件驱动的网络应用程序框架,常用于构建高效的服务器和客户端。 首先,我们需要理解RPC框架的基本组成部分: 1. **服务...
springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC手写框架分析springIOC...
本项目为《Netty4核心原理与手写RPC框架实战》一书的配套代码示例工程,涵盖223个文件,主要包括63个Java源文件、130个GIF图片、7个XML配置文件、4个PNG图片、4个JPG图片、3个Shell脚本、3个JavaScript文件、2个属性...
在这个Java手写RPC框架的项目中,我们主要关注两个核心部分:`rpc-server`和`rpc-client`。 ### 1. RPC服务器(rpc-server) `rpc-server`是RPC框架的提供者端,它承载了服务的实现并对外发布。以下是一些关键知识...
通过以上介绍,我们可以看到这个基于Java Socket的手写RPC框架是如何利用核心概念实现远程服务调用的。它简化了分布式系统间的通信,提高了开发效率。然而,实际生产环境中,为了提高性能、稳定性和可扩展性,我们...
RPC(Remote Procedure Call)框架是实现分布式系统的...总结来说,手写RPC框架需要考虑网络传输、数据序列化、动态代理以及服务注册与发现等多个层面。理解并实现这些组件将有助于深入掌握RPC的工作原理和设计思路。
本资源“手写RPC框架V1.zip”提供了作者手动实现的一个RPC框架的源代码,包括`rpc-server`和`rpc-client`两个部分,便于学习者深入理解RPC的工作原理。 ### RPC框架概述 RPC框架的核心目标是简化分布式系统间的通信...
下面将详细解释RPC的核心概念、工作原理以及手写RPC的基本流程。 一、RPC核心概念 1. **客户端(Client)**:发起RPC调用的一方,它通常需要知道服务接口和方法,但不需要关心服务是如何实现的。 2. **服务端...
框架是Netty,代码主要分为 provider registry protocol和consumer等。 实现本地调用LPC和远程调用RPC,对比了二者的速度。 RPC部分代码参考书籍:《Netty4核心原理与手写rpc实战》
Feign是Netflix公司开源的一款声明式、基于HTTP的RPC(远程过程调用)客户端框架,它使得编写Web服务客户端变得更加简单。Feign的设计理念是通过简单的接口定义来封装服务调用,让开发者能够专注于业务逻辑,而不是...
本文将详细介绍手写RPC框架的基本原理,并提供相关知识点。 首先,了解RPC的基本概念。RPC使得客户端可以调用服务器上的方法,而无需知道底层网络协议或细节。这个过程通常包括以下步骤: 1. **序列化与反序列化**...
在本课程"02-01-11-基于Spring JDBC手写定制自己的ORM框架1"中,我们将探讨如何利用Spring的JdbcTemplate设计理念,来构建一个自定义的ORM(对象关系映射)框架。ORM框架的主要目的是简化Java应用程序与数据库之间的...
该项目是《Netty4核心原理与手写RPC框架实战》一书的配套代码示例工程,由223个文件构成,其中Java源代码63个,GIF图像130个,XML配置7个,PNG图片4个,JPG图片4个,Shell脚本3个,JavaScript和CSS各3个,批处理文件...
该项目为基于Java语言的MyRPCFromZero RPC框架,精心设计了344个文件,涵盖179个类文件、153个Java源文件、6个XML配置文件、2个属性文件,以及必要的Git忽略和LICENSE文件等。旨在从零开始,逐步解析并实现一个易于...
通过对spring框架的理解和学习,手写一套属于自己的spring框架。这里面只是一个最简单的方式,真正的spring比这复杂的多。这里只是帮助喜欢spring的同学,对spring有个更深的理解和学习。
本教程将带你深入理解这两个概念,并通过手写一个简易的IoC和AOP框架来加深理解。 **依赖注入(IoC)** 依赖注入是Spring的核心特性之一,它允许开发者将对象的创建和管理权交给框架,从而降低组件之间的耦合度。在...
基于Python和PyTorch框架完成的一个手写数字识别实验源码(带MNIST手写数字数据集).zip 基于Python和PyTorch框架完成的一个手写数字识别实验源码(带MNIST手写数字数据集).zip 基于Python和PyTorch框架完成的一个手写...
总的来说,手写RPC框架的实践有助于深化对分布式系统、网络通信以及服务治理的理解。通过对`rpcServer`和`rpcClient`的分析,可以学习到如何构建一个基本的、可工作的RPC框架,这对于提升自己的系统设计和编程能力...