`
raymond.chen
  • 浏览: 1436998 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Thrift + Curator实现服务注册与发现功能

 
阅读更多

文章讲解如何使用Thrift和Curator实现自己的服务注册与发现功能,服务注册中心使用Zookeeper框架。

 

技术栈

        zookeeper-3.4.13

        thrift-0.11.0

        curator-2.13.0

 

特性

        1、服务集群部署:同一个服务可以部署到多台服务器上,注册中心维护一个服务的多份payload信息

        2、客户端软负载均衡:暂支持随机和轮询两种方式

        3、服务提供者和服务消费者可选择是否要注册到注册中心

        4、业务服务实现类自动加载和注册

        5、一个端口监听多个业务服务

 

最新源码: https://github.com/chenjuwen/thrift-microservice

 

 

功能主要包含以下关键实现点:

        扫描和加载业务服务实现类

        发布业务服务到thrift处理器

        注册业务服务到注册中心

        创建thrift服务端

        从注册中心查找业务服务

        调用业务服务接口

 

1、扫描和加载业务服务实现类

        业务服务接口通过Thrift的IDL进行描述,并使用Thrift提供的工具编译生成接口文件。

Common.thrift
namespace java com.seasy.microservice.api

struct Message {
	1: i32 type;
	2: binary data;
}

struct Response {
	1: i32 code;
	2: string message;
}


Hello.thrift
namespace java com.seasy.microservice.api

include "Common.thrift"
		
service Hello{
	string helloString(1:string param)
	Common.Response sendMessage(1:Common.Message message)
}

 

        开发业务服务接口的实现类,所有实现类统一用自定义注解类ServiceAnnotation加以标注。

@ServiceAnnotation(serviceClass=Hello.class, version="1.0.0")
public class HelloServiceImpl implements Hello.Iface{
	@Override
	public String helloString(String param) throws TException {
		return param;
	}
	
	@Override
	public Response sendMessage(Message message) throws TException {
		System.out.println(message.getType() + ", " + new String(message.getData()));
		
		Response response = new Response(0, "success");
		return response;
	}
}

 

        根据自定义注解类扫描和加载业务服务实现类。

private ConcurrentHashMap<String, ServiceInformation> loadService(String packagePath) throws Exception {
	ConcurrentHashMap<String, ServiceInformation> serviceInformationMap = new ConcurrentHashMap<String, ServiceInformation>();
	
	Reflections reflections = new Reflections(packagePath);
	
	//查找有指定注解类的服务类
	Set<Class<?>> serviceImplementClassSet = reflections.getTypesAnnotatedWith(ServiceAnnotation.class);
	for(Class<?> serviceImplementClass : serviceImplementClassSet){
		ServiceAnnotation serviceAnnotation = serviceImplementClass.getAnnotation(ServiceAnnotation.class);
		
		//服务相关信息封装在ServiceInformation类中
		ServiceInformation serviceInformation = new ServiceInformation();
		serviceInformation.setId(serviceInformation.getId());
		serviceInformation.setServiceClass(serviceAnnotation.serviceClass());
		serviceInformation.setVersion(serviceAnnotation.version());
		serviceInformation.setTimeout(serviceAnnotation.timeout());
		serviceInformation.setServiceImplementClassInstance(serviceImplementClass.newInstance());
		
		String key = serviceAnnotation.serviceClass().getName();
		serviceInformationMap.put(key, serviceInformation);
		logger.debug("Class [" + key + "] loaded!");
	}
	
	return serviceInformationMap;
}
 

2、发布业务服务到thrift处理器

        使用多路复用处理器TMultiplexedProcessor注册多个业务服务,这样通过监听一个端口即可提供多种服务。

private void buildMultiplexedProcessor(){
	multiplexedProcessor = new TMultiplexedProcessor();
	
	for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){
		String serviceClassFullname = it.next();
		logger.debug("serviceClassFullname=" + serviceClassFullname);
		
		//服务名
		String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1);
		logger.debug("serviceName=" + serviceName);
		
		Object serviceImplementClassInstance = serviceInformationMap.get(serviceClassFullname).getServiceImplementClassInstance();
		
		TProcessor serviceProcessor = createServiceProcessor(serviceClassFullname, serviceImplementClassInstance);
		if(serviceProcessor != null){
			//以接口主类的SimpleName作为服务名
			multiplexedProcessor.registerProcessor(serviceName, serviceProcessor);
			serviceInformationMap.get(serviceClassFullname).setProcessorRegistered(true);
			logger.info("Processor [" + serviceName + "] published!");
		}
	}
}

private TProcessor createServiceProcessor(String serviceClassFullname, Object serviceImplementClassInstance){
	try{
		String processorClassName = serviceClassFullname + "$Processor";
		String ifaceClassName = serviceClassFullname + "$Iface";
		
		Class<?> processorClass = Class.forName(processorClassName);
		Class<?> ifaceClass = Class.forName(ifaceClassName);
		
                //一个服务实现类对应一个Processor
		Constructor<?> constructor = processorClass.getDeclaredConstructor(new Class[]{ifaceClass});
		TProcessor processor = (TProcessor) constructor.newInstance(new Object[]{serviceImplementClassInstance});
		
		return processor;
		
	}catch(Exception ex){
		logger.error("create service[" + serviceClassFullname + "] processor error", ex);
	}
	return null;
}
 

 

3、注册业务服务到注册中心

        功能采用Zookeeper作为服务注册中心,使用Curator与Zookeeper进行通信。服务注册需要用到Curator扩展包curator-x-discovery.jar

 

        构造CuratorFramework实例

curator = CuratorFrameworkFactory.builder()
		.connectString(connectString)
		.namespace("thrift-microservice")
		.build();
curator.start();
 

        构造ServiceDiscovery实例

serviceDiscovery = ServiceDiscoveryBuilder.builder(ThriftServicePayload.class)
	.client(curator)
	.serializer(new JsonInstanceSerializer<>(ThriftServicePayload.class))
	.basePath(basePath)
	.build();
serviceDiscovery.start();
 

        将业务服务注册到Zookeeper注册中心

private void registerBusinessService(){
	for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){
		String serviceClassFullname = it.next();
		String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1);
		ServiceInformation serviceInformation = serviceInformationMap.get(serviceClassFullname);
		
		if(serviceInformation.isProcessorRegistered()){
			try{
				//构造ServiceInstance对象,该对象表示一个业务服务,用于存储业务服务相关的参数数据
				ServiceInstance<ThriftServicePayload> serviceInstance = ServiceInstance.<ThriftServicePayload>builder()
					.name(serviceName)
					.id(StringUtil.isEmpty(serviceInformation.getId()) ? serviceName : serviceInformation.getId())
					.address(EnvUtil.getLocalIp())
					.port(getPort())
					.payload(new ThriftServicePayload(serviceInformation.getVersion(), serviceInformation.getServiceClass().getName()))
					.registrationTimeUTC(System.currentTimeMillis())
					.serviceType(ServiceType.DYNAMIC)
					.build();
				
				serviceRegistry.registerService(serviceInstance);
				serviceInformation.setServiceRegistered(true);
				logger.info("Service [" + serviceName + "] registered!");
				
			}catch(Exception ex){
				logger.error("register service[" + serviceName + "] error", ex);
			}
		}
	}
}

  

4、创建thrift服务端

private void startServer()throws Exception{
	serverSocket = new TNonblockingServerSocket(getPort());
	
	TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
	tArgs.processor(multiplexedProcessor);
	tArgs.transportFactory(new TFramedTransport.Factory());
	tArgs.protocolFactory(new TCompactProtocol.Factory());

	tserver = new TNonblockingServer(tArgs);
	tserver.setServerEventHandler(new DefaultServerEventHandler());
	tserver.serve();
}

  

5、从注册中心查找业务服务

        根据业务服务的Client类从注册中心查找对应的服务配置信息,并根据服务配置信息实例化服务Client类的对象。

public <T> T getServiceClient(Class<T> serviceClientClass){
	try{
		String serviceClientClassName = serviceClientClass.getName();
		if(!serviceClientClassName.endsWith("$Client")){
			throw new IllegalArgumentException("serviceClientClass must be $Client class");
		}
		
		String serviceName = serviceClientClassName.replace("$Client", "");
		serviceName = serviceName.substring(serviceName.lastIndexOf(".")+1);
		
		Object object = getServiceClient(serviceName);
		if(object != null){
			return (T)object;
		}
		
	}catch(Exception ex){
		logger.error("Failed to get ServiceClient", ex);
	}
	return null;
}

public Object getServiceClient(String serviceName) {
	try{
		ServiceInstance<ThriftServicePayload> serviceInstance = queryForInstance(serviceName);
		if(serviceInstance != null){
			//服务所在机器的IP地址
			String host = serviceInstance.getAddress();
			
			//服务所在机器的监听端口
			int port = serviceInstance.getPort();
			
			ServiceClientFactory factory = null;
			ServiceClientWrapper wrapper = null;
			
			String key = host + ":" + port;
			if(serviceClientFactoryMap.containsKey(key)){
				factory = serviceClientFactoryMap.get(key);
				wrapper = factory.getServiceClientWrapper(serviceName);
			}else{
				logger.info("create ServiceClientFactory...");
				factory = ServiceClientFactory.getInstance();
				factory.setHost(host);
				factory.setPort(port);
				factory.open();
				
				serviceClientFactoryMap.put(key, factory);
			}
			
			if(wrapper == null){
				Class<?> serviceClientClass = Class.forName(serviceInstance.getPayload().getInterfaceName() + "$Client");
				
				wrapper = new ServiceClientWrapper(serviceInstance, serviceClientClass, serviceName);
				factory.addServiceClientWrapper(wrapper);
				
				return factory.getServiceClientWrapper(serviceName).getServiceClientInstanceObject();
			}else{
				return wrapper.getServiceClientInstanceObject();
			}
		}else{
			//服务不存在
			logger.error("service not found: " + serviceName);
		}
		
	}catch(Exception ex){
		logger.error("Failed to get ServiceClient", ex);
	}
	return null;
}

 

    一个ServiceClientFactory对象代表一个服务提供者,一个服务提供者可以提供多个业务服务,每个业务服务对应的客户端对象用ServiceClientWrapper实例表示。

public void open(){
	try{
		if(StringUtil.isNotEmpty(this.host) && this.port != 0){
			if(transport == null){
				transport = new TFramedTransport(new TSocket(this.host, this.port));
				transport.open();
				logger.debug("Transport opened --> " + this.host + ":" + this.port);
			}else if(!transport.isOpen()){
				transport.open();
			}
		}else{
			throw new IllegalArgumentException("parameter host or port is invalid!");
		}
	}catch(Exception ex){
		close();
		throw new RuntimeException("Failed to open service Socket", ex);
	}
}

public ServiceClientWrapper getServiceClientWrapper(String serviceName){
	return serviceClientWrapperMap.get(serviceName);
}

public void addServiceClientWrapper(ServiceClientWrapper wrapper){
	if(!serviceClientWrapperMap.containsKey(wrapper.getServiceName())){
		wrapper = instanceServiceClient(wrapper);
		serviceClientWrapperMap.put(wrapper.getServiceName(), wrapper);
	}
}

private ServiceClientWrapper instanceServiceClient(ServiceClientWrapper wrapper){
	try{
		logger.debug("instance ServiceClient: " + wrapper.getServiceClientClass().getName());
		TProtocol protocol = new TCompactProtocol(transport);
		TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(protocol, wrapper.getServiceName());
		Class[] classes = new Class[]{TProtocol.class};
		Object serviceClientInstanceObject = wrapper.getServiceClientClass().getConstructor(classes).newInstance(multiplexedProtocol);
		wrapper.setServiceClientInstanceObject(serviceClientInstanceObject);
		return wrapper;
	}catch(Exception ex){
		logger.error("instance ServiceClient error", ex);
	}
	return wrapper;
}

 

public class ServiceClientWrapper {
	private ServiceInstance<ThriftServicePayload> serviceInstance;
    private Class<?> serviceClientClass;
    private String serviceName;
	private Object serviceClientInstanceObject; //服务客户端类的实例对象
    
    public ServiceClientWrapper(ServiceInstance<ThriftServicePayload> serviceInstance, Class<?> serviceClientClass, String serviceName){
    	this.serviceInstance = serviceInstance;
    	this.serviceClientClass = serviceClientClass;
    	this.serviceName = serviceName;
    }

	public ServiceInstance<ThriftServicePayload> getServiceInstance() {
		return serviceInstance;
	}

	public Class<?> getServiceClientClass() {
		return serviceClientClass;
	}

	public String getServiceName() {
		return serviceName;
	}
	
	public Object getServiceClientInstanceObject() {
		return serviceClientInstanceObject;
	}

	public void setServiceClientInstanceObject(Object serviceClientInstanceObject) {
		this.serviceClientInstanceObject = serviceClientInstanceObject;
	}
}

  

6、调用业务服务接口

        客户端获取服务Client实例对象的方式:

Hello.Client client = getServiceClient(Hello.Client.class)

或者

Object object = clientBootstrap.getServiceClient("Hello");
if(object != null){
	Hello.Client client = (Hello.Client)object;
}

 

        访问接口方法:

client.helloString("hello string")

ByteBuffer data = ByteBuffer.wrap("hello world".getBytes("UTF-8"));
Response response = client.sendMessage(new Message(1, data));
System.out.println(response.getCode() + ", " + response.getMessage());

 

 7、注册中心管理(截图)


 



 



 

 

 

 

  • 大小: 17.5 KB
  • 大小: 16.6 KB
  • 大小: 42.4 KB
分享到:
评论

相关推荐

    Apache Thrift 初学小讲(八)【zookeeper实现服务注册与发现】

    它提供了一种高可用、高性能的分布式数据管理与配置服务,常被用来实现服务注册与发现功能。ZooKeeper通过维护一个分布式的共享命名空间,让服务提供者能够注册自己的服务,同时服务消费者能够查找并连接到这些服务...

    thrift+依赖包整合

    在"thrift+依赖包整合"中,我们包含了以下关键组件: 1. **Thrift**:这是Thrift的核心库,它提供了一个编译器,可以将用户定义的IDL文件(.thrift)转换为各种目标语言的代码,如Java、Python、C++等。这个编译器...

    Thrift+SpringBoot+RocketMQ+Elasticsearch+C#

    Thrift+SpringBoot+RocketMQ+Elasticsearch+C#; 1.通过thrift的RPC实现C#与java通信, 2.阿里巴巴RocketMQ实现高并发数据消息队列 3.Elasticsearch实现对数据全文检索

    netty+thrift高并发高性能

    本文将深入探讨Netty与Thrift结合实现高并发高性能的关键技术点。 #### 二、Netty 高性能分析 ##### 2.1 RPC调用性能瓶颈分析 在传统的RPC框架中,主要存在以下三大性能瓶颈: 1. **网络传输方式**:传统的RPC...

    Thrift+Scribe分布式日志系统的构建

    2. **实现 log4j 输出到 Thrift**:编写一个 log4j Appender,它使用 Thrift 库与 Scribe 客户端进行通信,将 log4j 生成的日志转换为 `LogEntry` 对象。 3. **运行 Scribe 客户端**:启动 Scribe 客户端,确保其...

    scribe+thrift+fb303已经编译好的rpm包,直接下载安装使用

    最佳环境 CentOS 5.4 Scribe真正可用rpm安装包 apache-thrift-0.7.0-1.x86_64.rpm,fb303-0.7.0-1.x86_64.rpm,scribe-2.2-3.x86_64.rpm。无需编译,一命令安装。简单快捷,方便部署。...scribe+thrift+fb303.7z

    thrift + 服务模型实例演示(java)

    主要是对thrift0.9.0 TSimpleServer、TThreadPoolServer 、TNonblockingServer、THsHaServer等服务模型实例和AsynClient 异步客户端实例代码的演示

    thrift官方代码+与dubbo集成支持原生thrift协议

    5. **配置与注册**:确保服务提供者和消费者都正确配置了Thrift协议,并在服务注册中心进行注册和发现。 集成过程可能会遇到的问题包括兼容性问题、序列化与反序列化问题、服务端与客户端版本不一致等。为了解决...

    Thrift-server与spring集成

    当我们将Thrift与Spring集成时,我们可以利用Spring的强大功能来管理和协调Thrift服务,从而构建出高效、灵活的分布式系统。 集成Thrift和Spring的主要目的是为了利用Spring的依赖注入(DI)和面向切面编程(AOP)...

    zookeeper + thrift实现的RPC 服务治理框架演示

    服务治理框架,一般存在与RPC的上一层,用来在大量RPC服务至上,协调客户端和服务器的调用工作。这个示例工程和我的博客《架构设计:系统间通信(13)——RPC实例Apache Thrift 下篇》...

    Java Thrift demo例子

    通过这个Java Thrift Demo,我们可以深入理解Thrift如何在Java中实现RPC通信,包括服务定义、代码生成、服务端实现、客户端调用等关键步骤。这对于初学者来说,是一个很好的起点,有助于进一步学习和应用Thrift进行...

    Apache Thrift.rar+RPC+微服务+异步通信+安全认证+服务发现等

    ApacheThrift简介与安装 Thrift数据类型与IDL理解 Thrift服务定义与实现 Thrift跨语言服务开发实践 Thrift编译器与代码生成 Thrift客户端与服务器端通信 ...Thrift服务发现与注册机制 Thrift安全性与认证授权

    Thrift RPC客户端的服务化框架代码

    Thrift RPC客户端的服务化框架代码主要涉及了两个关键概念:Thrift和RPC(Remote Procedure Call,...在实际应用中,可能还需要考虑线程安全、错误处理、负载均衡、服务注册与发现等高级特性,以构建健壮的分布式系统。

    thrift源码+DEMO+简单教程

    通过阅读和运行这个DEMO,你可以更深入地理解Thrift的工作原理和使用方法,包括如何定义服务、生成代码、实现服务端、创建客户端以及进行通信。 总结来说,Thrift是一个强大的工具,它简化了分布式系统中不同语言...

    zk+thrift demo

    【描述】"供大家使用" 表明这个 demo 是为了提供给开发者们参考和学习,帮助他们理解如何在自己的项目中运用 ZooKeeper 和 Thrift,实现服务间的通信和管理。 【标签】"thrift" 指出核心关注点是 Thrift 技术。...

    thrift实现http协议案例

    这个案例中的“MyThriftWebTest”可能是一个包含整个实现的测试工程,包括Thrift服务的定义、生成的Java代码以及Servlet的实现。通过运行这个测试工程,你可以了解如何将Thrift服务部署到支持HTTP的服务器上,如...

    thrift多接口服务示例

    然后,你可以创建一个Thrift服务器实例,注册这些服务处理类,并启动服务器监听客户端请求。 4. **客户端调用**: 在客户端,Thrift会生成对应服务的客户端代理类,你可以实例化这些代理,然后像调用本地方法一样...

    Thrift源码+库文件,解压后可直接使用

    Thrift的主要目标是解决分布式系统中的通信问题,通过提供一套代码生成工具,能够将服务定义转化为多种编程语言的客户端和服务器端代码,从而实现不同语言之间的无缝通信。 Thrift的核心在于其接口描述语言(IDL,...

Global site tag (gtag.js) - Google Analytics