文章讲解如何使用Thrift和Curator实现自己的服务注册与发现功能,服务注册中心使用Zookeeper框架。
技术栈
zookeeper-3.4.13
thrift-0.11.0
curator-2.13.0
特性
1、服务集群部署:同一个服务可以部署到多台服务器上,注册中心维护一个服务的多份payload信息
2、客户端软负载均衡:暂支持随机和轮询两种方式
3、服务提供者和服务消费者可选择是否要注册到注册中心
4、业务服务实现类自动加载和注册
5、一个端口监听多个业务服务
最新源码: https://github.com/chenjuwen/thrift-microservice
功能主要包含以下关键实现点:
扫描和加载业务服务实现类
发布业务服务到thrift处理器
注册业务服务到注册中心
创建thrift服务端
从注册中心查找业务服务
调用业务服务接口
1、扫描和加载业务服务实现类
业务服务接口通过Thrift的IDL进行描述,并使用Thrift提供的工具编译生成接口文件。
Common.thrift namespace java com.seasy.microservice.api struct Message { 1: i32 type; 2: binary data; } struct Response { 1: i32 code; 2: string message; } Hello.thrift namespace java com.seasy.microservice.api include "Common.thrift" service Hello{ string helloString(1:string param) Common.Response sendMessage(1:Common.Message message) }
开发业务服务接口的实现类,所有实现类统一用自定义注解类ServiceAnnotation加以标注。
@ServiceAnnotation(serviceClass=Hello.class, version="1.0.0") public class HelloServiceImpl implements Hello.Iface{ @Override public String helloString(String param) throws TException { return param; } @Override public Response sendMessage(Message message) throws TException { System.out.println(message.getType() + ", " + new String(message.getData())); Response response = new Response(0, "success"); return response; } }
根据自定义注解类扫描和加载业务服务实现类。
private ConcurrentHashMap<String, ServiceInformation> loadService(String packagePath) throws Exception { ConcurrentHashMap<String, ServiceInformation> serviceInformationMap = new ConcurrentHashMap<String, ServiceInformation>(); Reflections reflections = new Reflections(packagePath); //查找有指定注解类的服务类 Set<Class<?>> serviceImplementClassSet = reflections.getTypesAnnotatedWith(ServiceAnnotation.class); for(Class<?> serviceImplementClass : serviceImplementClassSet){ ServiceAnnotation serviceAnnotation = serviceImplementClass.getAnnotation(ServiceAnnotation.class); //服务相关信息封装在ServiceInformation类中 ServiceInformation serviceInformation = new ServiceInformation(); serviceInformation.setId(serviceInformation.getId()); serviceInformation.setServiceClass(serviceAnnotation.serviceClass()); serviceInformation.setVersion(serviceAnnotation.version()); serviceInformation.setTimeout(serviceAnnotation.timeout()); serviceInformation.setServiceImplementClassInstance(serviceImplementClass.newInstance()); String key = serviceAnnotation.serviceClass().getName(); serviceInformationMap.put(key, serviceInformation); logger.debug("Class [" + key + "] loaded!"); } return serviceInformationMap; }
2、发布业务服务到thrift处理器
使用多路复用处理器TMultiplexedProcessor注册多个业务服务,这样通过监听一个端口即可提供多种服务。
private void buildMultiplexedProcessor(){ multiplexedProcessor = new TMultiplexedProcessor(); for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){ String serviceClassFullname = it.next(); logger.debug("serviceClassFullname=" + serviceClassFullname); //服务名 String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1); logger.debug("serviceName=" + serviceName); Object serviceImplementClassInstance = serviceInformationMap.get(serviceClassFullname).getServiceImplementClassInstance(); TProcessor serviceProcessor = createServiceProcessor(serviceClassFullname, serviceImplementClassInstance); if(serviceProcessor != null){ //以接口主类的SimpleName作为服务名 multiplexedProcessor.registerProcessor(serviceName, serviceProcessor); serviceInformationMap.get(serviceClassFullname).setProcessorRegistered(true); logger.info("Processor [" + serviceName + "] published!"); } } } private TProcessor createServiceProcessor(String serviceClassFullname, Object serviceImplementClassInstance){ try{ String processorClassName = serviceClassFullname + "$Processor"; String ifaceClassName = serviceClassFullname + "$Iface"; Class<?> processorClass = Class.forName(processorClassName); Class<?> ifaceClass = Class.forName(ifaceClassName); //一个服务实现类对应一个Processor Constructor<?> constructor = processorClass.getDeclaredConstructor(new Class[]{ifaceClass}); TProcessor processor = (TProcessor) constructor.newInstance(new Object[]{serviceImplementClassInstance}); return processor; }catch(Exception ex){ logger.error("create service[" + serviceClassFullname + "] processor error", ex); } return null; }
3、注册业务服务到注册中心
功能采用Zookeeper作为服务注册中心,使用Curator与Zookeeper进行通信。服务注册需要用到Curator扩展包curator-x-discovery.jar
构造CuratorFramework实例
curator = CuratorFrameworkFactory.builder() .connectString(connectString) .namespace("thrift-microservice") .build(); curator.start();
构造ServiceDiscovery实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(ThriftServicePayload.class) .client(curator) .serializer(new JsonInstanceSerializer<>(ThriftServicePayload.class)) .basePath(basePath) .build(); serviceDiscovery.start();
将业务服务注册到Zookeeper注册中心
private void registerBusinessService(){ for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){ String serviceClassFullname = it.next(); String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1); ServiceInformation serviceInformation = serviceInformationMap.get(serviceClassFullname); if(serviceInformation.isProcessorRegistered()){ try{ //构造ServiceInstance对象,该对象表示一个业务服务,用于存储业务服务相关的参数数据 ServiceInstance<ThriftServicePayload> serviceInstance = ServiceInstance.<ThriftServicePayload>builder() .name(serviceName) .id(StringUtil.isEmpty(serviceInformation.getId()) ? serviceName : serviceInformation.getId()) .address(EnvUtil.getLocalIp()) .port(getPort()) .payload(new ThriftServicePayload(serviceInformation.getVersion(), serviceInformation.getServiceClass().getName())) .registrationTimeUTC(System.currentTimeMillis()) .serviceType(ServiceType.DYNAMIC) .build(); serviceRegistry.registerService(serviceInstance); serviceInformation.setServiceRegistered(true); logger.info("Service [" + serviceName + "] registered!"); }catch(Exception ex){ logger.error("register service[" + serviceName + "] error", ex); } } } }
4、创建thrift服务端
private void startServer()throws Exception{ serverSocket = new TNonblockingServerSocket(getPort()); TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket); tArgs.processor(multiplexedProcessor); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory(new TCompactProtocol.Factory()); tserver = new TNonblockingServer(tArgs); tserver.setServerEventHandler(new DefaultServerEventHandler()); tserver.serve(); }
5、从注册中心查找业务服务
根据业务服务的Client类从注册中心查找对应的服务配置信息,并根据服务配置信息实例化服务Client类的对象。
public <T> T getServiceClient(Class<T> serviceClientClass){ try{ String serviceClientClassName = serviceClientClass.getName(); if(!serviceClientClassName.endsWith("$Client")){ throw new IllegalArgumentException("serviceClientClass must be $Client class"); } String serviceName = serviceClientClassName.replace("$Client", ""); serviceName = serviceName.substring(serviceName.lastIndexOf(".")+1); Object object = getServiceClient(serviceName); if(object != null){ return (T)object; } }catch(Exception ex){ logger.error("Failed to get ServiceClient", ex); } return null; } public Object getServiceClient(String serviceName) { try{ ServiceInstance<ThriftServicePayload> serviceInstance = queryForInstance(serviceName); if(serviceInstance != null){ //服务所在机器的IP地址 String host = serviceInstance.getAddress(); //服务所在机器的监听端口 int port = serviceInstance.getPort(); ServiceClientFactory factory = null; ServiceClientWrapper wrapper = null; String key = host + ":" + port; if(serviceClientFactoryMap.containsKey(key)){ factory = serviceClientFactoryMap.get(key); wrapper = factory.getServiceClientWrapper(serviceName); }else{ logger.info("create ServiceClientFactory..."); factory = ServiceClientFactory.getInstance(); factory.setHost(host); factory.setPort(port); factory.open(); serviceClientFactoryMap.put(key, factory); } if(wrapper == null){ Class<?> serviceClientClass = Class.forName(serviceInstance.getPayload().getInterfaceName() + "$Client"); wrapper = new ServiceClientWrapper(serviceInstance, serviceClientClass, serviceName); factory.addServiceClientWrapper(wrapper); return factory.getServiceClientWrapper(serviceName).getServiceClientInstanceObject(); }else{ return wrapper.getServiceClientInstanceObject(); } }else{ //服务不存在 logger.error("service not found: " + serviceName); } }catch(Exception ex){ logger.error("Failed to get ServiceClient", ex); } return null; }
一个ServiceClientFactory对象代表一个服务提供者,一个服务提供者可以提供多个业务服务,每个业务服务对应的客户端对象用ServiceClientWrapper实例表示。
public void open(){ try{ if(StringUtil.isNotEmpty(this.host) && this.port != 0){ if(transport == null){ transport = new TFramedTransport(new TSocket(this.host, this.port)); transport.open(); logger.debug("Transport opened --> " + this.host + ":" + this.port); }else if(!transport.isOpen()){ transport.open(); } }else{ throw new IllegalArgumentException("parameter host or port is invalid!"); } }catch(Exception ex){ close(); throw new RuntimeException("Failed to open service Socket", ex); } } public ServiceClientWrapper getServiceClientWrapper(String serviceName){ return serviceClientWrapperMap.get(serviceName); } public void addServiceClientWrapper(ServiceClientWrapper wrapper){ if(!serviceClientWrapperMap.containsKey(wrapper.getServiceName())){ wrapper = instanceServiceClient(wrapper); serviceClientWrapperMap.put(wrapper.getServiceName(), wrapper); } } private ServiceClientWrapper instanceServiceClient(ServiceClientWrapper wrapper){ try{ logger.debug("instance ServiceClient: " + wrapper.getServiceClientClass().getName()); TProtocol protocol = new TCompactProtocol(transport); TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(protocol, wrapper.getServiceName()); Class[] classes = new Class[]{TProtocol.class}; Object serviceClientInstanceObject = wrapper.getServiceClientClass().getConstructor(classes).newInstance(multiplexedProtocol); wrapper.setServiceClientInstanceObject(serviceClientInstanceObject); return wrapper; }catch(Exception ex){ logger.error("instance ServiceClient error", ex); } return wrapper; }
public class ServiceClientWrapper { private ServiceInstance<ThriftServicePayload> serviceInstance; private Class<?> serviceClientClass; private String serviceName; private Object serviceClientInstanceObject; //服务客户端类的实例对象 public ServiceClientWrapper(ServiceInstance<ThriftServicePayload> serviceInstance, Class<?> serviceClientClass, String serviceName){ this.serviceInstance = serviceInstance; this.serviceClientClass = serviceClientClass; this.serviceName = serviceName; } public ServiceInstance<ThriftServicePayload> getServiceInstance() { return serviceInstance; } public Class<?> getServiceClientClass() { return serviceClientClass; } public String getServiceName() { return serviceName; } public Object getServiceClientInstanceObject() { return serviceClientInstanceObject; } public void setServiceClientInstanceObject(Object serviceClientInstanceObject) { this.serviceClientInstanceObject = serviceClientInstanceObject; } }
6、调用业务服务接口
客户端获取服务Client实例对象的方式:
Hello.Client client = getServiceClient(Hello.Client.class) 或者 Object object = clientBootstrap.getServiceClient("Hello"); if(object != null){ Hello.Client client = (Hello.Client)object; }
访问接口方法:
client.helloString("hello string") ByteBuffer data = ByteBuffer.wrap("hello world".getBytes("UTF-8")); Response response = client.sendMessage(new Message(1, data)); System.out.println(response.getCode() + ", " + response.getMessage());
7、注册中心管理(截图)
相关推荐
它提供了一种高可用、高性能的分布式数据管理与配置服务,常被用来实现服务注册与发现功能。ZooKeeper通过维护一个分布式的共享命名空间,让服务提供者能够注册自己的服务,同时服务消费者能够查找并连接到这些服务...
在"thrift+依赖包整合"中,我们包含了以下关键组件: 1. **Thrift**:这是Thrift的核心库,它提供了一个编译器,可以将用户定义的IDL文件(.thrift)转换为各种目标语言的代码,如Java、Python、C++等。这个编译器...
Thrift+SpringBoot+RocketMQ+Elasticsearch+C#; 1.通过thrift的RPC实现C#与java通信, 2.阿里巴巴RocketMQ实现高并发数据消息队列 3.Elasticsearch实现对数据全文检索
本文将深入探讨Netty与Thrift结合实现高并发高性能的关键技术点。 #### 二、Netty 高性能分析 ##### 2.1 RPC调用性能瓶颈分析 在传统的RPC框架中,主要存在以下三大性能瓶颈: 1. **网络传输方式**:传统的RPC...
2. **实现 log4j 输出到 Thrift**:编写一个 log4j Appender,它使用 Thrift 库与 Scribe 客户端进行通信,将 log4j 生成的日志转换为 `LogEntry` 对象。 3. **运行 Scribe 客户端**:启动 Scribe 客户端,确保其...
最佳环境 CentOS 5.4 Scribe真正可用rpm安装包 apache-thrift-0.7.0-1.x86_64.rpm,fb303-0.7.0-1.x86_64.rpm,scribe-2.2-3.x86_64.rpm。无需编译,一命令安装。简单快捷,方便部署。...scribe+thrift+fb303.7z
主要是对thrift0.9.0 TSimpleServer、TThreadPoolServer 、TNonblockingServer、THsHaServer等服务模型实例和AsynClient 异步客户端实例代码的演示
5. **配置与注册**:确保服务提供者和消费者都正确配置了Thrift协议,并在服务注册中心进行注册和发现。 集成过程可能会遇到的问题包括兼容性问题、序列化与反序列化问题、服务端与客户端版本不一致等。为了解决...
当我们将Thrift与Spring集成时,我们可以利用Spring的强大功能来管理和协调Thrift服务,从而构建出高效、灵活的分布式系统。 集成Thrift和Spring的主要目的是为了利用Spring的依赖注入(DI)和面向切面编程(AOP)...
服务治理框架,一般存在与RPC的上一层,用来在大量RPC服务至上,协调客户端和服务器的调用工作。这个示例工程和我的博客《架构设计:系统间通信(13)——RPC实例Apache Thrift 下篇》...
通过这个Java Thrift Demo,我们可以深入理解Thrift如何在Java中实现RPC通信,包括服务定义、代码生成、服务端实现、客户端调用等关键步骤。这对于初学者来说,是一个很好的起点,有助于进一步学习和应用Thrift进行...
ApacheThrift简介与安装 Thrift数据类型与IDL理解 Thrift服务定义与实现 Thrift跨语言服务开发实践 Thrift编译器与代码生成 Thrift客户端与服务器端通信 ...Thrift服务发现与注册机制 Thrift安全性与认证授权
Thrift RPC客户端的服务化框架代码主要涉及了两个关键概念:Thrift和RPC(Remote Procedure Call,...在实际应用中,可能还需要考虑线程安全、错误处理、负载均衡、服务注册与发现等高级特性,以构建健壮的分布式系统。
通过阅读和运行这个DEMO,你可以更深入地理解Thrift的工作原理和使用方法,包括如何定义服务、生成代码、实现服务端、创建客户端以及进行通信。 总结来说,Thrift是一个强大的工具,它简化了分布式系统中不同语言...
【描述】"供大家使用" 表明这个 demo 是为了提供给开发者们参考和学习,帮助他们理解如何在自己的项目中运用 ZooKeeper 和 Thrift,实现服务间的通信和管理。 【标签】"thrift" 指出核心关注点是 Thrift 技术。...
这个案例中的“MyThriftWebTest”可能是一个包含整个实现的测试工程,包括Thrift服务的定义、生成的Java代码以及Servlet的实现。通过运行这个测试工程,你可以了解如何将Thrift服务部署到支持HTTP的服务器上,如...
然后,你可以创建一个Thrift服务器实例,注册这些服务处理类,并启动服务器监听客户端请求。 4. **客户端调用**: 在客户端,Thrift会生成对应服务的客户端代理类,你可以实例化这些代理,然后像调用本地方法一样...
Thrift的主要目标是解决分布式系统中的通信问题,通过提供一套代码生成工具,能够将服务定义转化为多种编程语言的客户端和服务器端代码,从而实现不同语言之间的无缝通信。 Thrift的核心在于其接口描述语言(IDL,...