Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:
1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.
2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.
3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.
4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.
5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。
1. pom.xml
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.0.7.RELEASE</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> <!--<exclusions>--> <!--<exclusion>--> <!--<groupId>log4j</groupId>--> <!--<artifactId>log4j</artifactId>--> <!--</exclusion>--> <!--</exclusions>--> </dependency> <!-- <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.4</version> </dependency> --> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> </dependencies>
2. spring-thrift-client.xml
其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.
<!-- fixedAddress --> <!-- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory"> <property name="service" value="com.demo.service.UserService"></property> <property name="serverAddress" value="127.0.0.1:9090:2"></property> <property name="maxActive" value="5"></property> <property name="idleTime" value="10000"></property> </bean> --> <!-- zookeeper --> <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="connectString" value="127.0.0.1:2181"></property> <property name="namespace" value="demo/thrift-service"></property> </bean> <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close"> <property name="service" value="com.demo.service.UserService"></property> <property name="maxActive" value="5"></property> <property name="idleTime" value="1800000"></property> <property name="addressProvider"> <bean class="com.demo.thrift.support.impl.DynamicAddressProvider"> <property name="configPath" value="UserServiceImpl"></property> <property name="zookeeper" ref="thriftZookeeper"></property> </bean> </property> </bean>
3. ThriftServiceClientProxyFactory.java
因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".
@SuppressWarnings("rawtypes") public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean { private String service; private String serverAddress; private Integer maxActive = 32;//最大活跃连接数 ////ms,default 3 min,链接空闲时间 //-1,关闭空闲检测 private Integer idleTime = 180000; private ThriftServerAddressProvider addressProvider; private Object proxyClient; public void setMaxActive(Integer maxActive) { this.maxActive = maxActive; } public void setIdleTime(Integer idleTime) { this.idleTime = idleTime; } public void setService(String service) { this.service = service; } public void setServerAddress(String serverAddress) { this.serverAddress = serverAddress; } public void setAddressProvider(ThriftServerAddressProvider addressProvider) { this.addressProvider = addressProvider; } private Class objectClass; private GenericObjectPool<TServiceClient> pool; private PoolOperationCallBack callback = new PoolOperationCallBack() { @Override public void make(TServiceClient client) { System.out.println("create"); } @Override public void destroy(TServiceClient client) { System.out.println("destroy"); } }; @Override public void afterPropertiesSet() throws Exception { if(serverAddress != null){ addressProvider = new FixedAddressProvider(serverAddress); } ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); //加载Iface接口 objectClass = classLoader.loadClass(service + "$Iface"); //加载Client.Factory类 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback); GenericObjectPool.Config poolConfig = new GenericObjectPool.Config(); poolConfig.maxActive = maxActive; poolConfig.minIdle = 0; poolConfig.minEvictableIdleTimeMillis = idleTime; poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L; pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig); proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TServiceClient client = pool.borrowObject(); try{ return method.invoke(client, args); }catch(Exception e){ throw e; }finally{ pool.returnObject(client); } } }); } @Override public Object getObject() throws Exception { return proxyClient; } @Override public Class<?> getObjectType() { return objectClass; } @Override public boolean isSingleton() { return true; //To change body of implemented methods use File | Settings | File Templates. } public void close(){ if(addressProvider != null){ addressProvider.close(); } } }
4. ThriftClientPoolFactory.java
"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.
/** * 连接池,thrift-client for spring */ public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{ private final ThriftServerAddressProvider addressProvider; private final TServiceClientFactory<TServiceClient> clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception { this.addressProvider = addressProvider; this.clientFactory = clientFactory; } protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception { this.addressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } @Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = addressProvider.selector(); TSocket tsocket = new TSocket(address.getHostName(),address.getPort()); TProtocol protocol = new TBinaryProtocol(tsocket); TServiceClient client = this.clientFactory.getClient(protocol); tsocket.open(); if(callback != null){ try{ callback.make(client); }catch(Exception e){ // } } return client; } public void destroyObject(TServiceClient client) throws Exception { if(callback != null){ try{ callback.destroy(client); }catch(Exception e){ // } } TTransport pin = client.getInputProtocol().getTransport(); pin.close(); } public boolean validateObject(TServiceClient client) { TTransport pin = client.getInputProtocol().getTransport(); return pin.isOpen(); } static interface PoolOperationCallBack { //销毁client之前执行 void destroy(TServiceClient client); //创建成功是执行 void make(TServiceClient client); } }
5. DynamicAddressProvider.java
将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.
/** * 可以动态获取address地址,方案设计参考 * 1) 可以间歇性的调用一个web-service来获取地址 * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等) * 3) 可以基于zookeeper-watcher机制,获取最新地址 * <p/> * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 * 如下实现,仅供参考 */ public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean { private String configPath; private PathChildrenCache cachedPath; private CuratorFramework zookeeper; //用来保存当前provider所接触过的地址记录 //当zookeeper集群故障时,可以使用trace中地址,作为"备份" private Set<String> trace = new HashSet<String>(); private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>(); private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>(); private Object lock = new Object(); private static final Integer DEFAULT_PRIORITY = 1; public void setConfigPath(String configPath) { this.configPath = configPath; } public void setZookeeper(CuratorFramework zookeeper) { this.zookeeper = zookeeper; } @Override public void afterPropertiesSet() throws Exception { //如果zk尚未启动,则启动 if(zookeeper.getState() == CuratorFrameworkState.LATENT){ zookeeper.start(); } buildPathChildrenCache(zookeeper, configPath, true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); } private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception { cachedPath = new PathChildrenCache(client, path, cacheData); cachedPath.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { // case CONNECTION_RECONNECTED: // // break; case CONNECTION_SUSPENDED: case CONNECTION_LOST: System.out.println("Connection error,waiting..."); return; default: // } //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法. cachedPath.rebuild(); rebuild(); } protected void rebuild() throws Exception { List<ChildData> children = cachedPath.getCurrentData(); if (children == null || children.isEmpty()) { //有可能所有的thrift server都与zookeeper断开了链接 //但是,有可能,thrift client与thrift server之间的网络是良好的 //因此此处是否需要清空container,是需要多方面考虑的. container.clear(); System.out.println("thrift server-cluster error...."); return; } List<InetSocketAddress> current = new ArrayList<InetSocketAddress>(); for (ChildData data : children) { String address = new String(data.getData(), "utf-8"); current.addAll(transfer(address)); trace.add(address); } Collections.shuffle(current); synchronized (lock) { container.clear(); container.addAll(current); inner.clear(); inner.addAll(current); } } }); } private List<InetSocketAddress> transfer(String address){ String[] hostname = address.split(":"); Integer priority = DEFAULT_PRIORITY; if (hostname.length == 3) { priority = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(); for (int i = 0; i < priority; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } @Override public List<InetSocketAddress> getAll() { return Collections.unmodifiableList(container); } @Override public synchronized InetSocketAddress selector() { if (inner.isEmpty()) { if(!container.isEmpty()){ inner.addAll(container); }else if(!trace.isEmpty()){ synchronized (lock) { for(String hostname : trace){ container.addAll(transfer(hostname)); } Collections.shuffle(container); inner.addAll(container); } } } return inner.poll();//null } @Override public void close() { try { cachedPath.close(); zookeeper.close(); } catch (Exception e) { // } } }
到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.
Thrift-server端开发与配置,参见[Thrift-server]
相关推荐
spring-cloud-starter-thrift简介spring-cloud-starter-thrift提供Spring Cloud对可伸缩的跨语言服务调用框架Apache Thrift的封装和集成。spring-cloud-starter-thrift包括客户端spring-cloud-starter-thrift-client...
本篇文章将探讨如何在Spring框架中集成Thrift,构建一个Server和Client,以便在分布式系统中实现高效的数据通信。 首先,我们需要了解Thrift的基本原理。Thrift通过定义服务接口和数据结构的IDL(接口定义语言)...
4. 客户端代码:使用Thrift生成的客户端接口与服务端通信。 5. 运行脚本:启动服务端和客户端的命令。 通过这个示例,你可以学习到Thrift的基本使用,包括服务定义、编译、服务端和客户端的实现,以及如何进行跨...
在本篇小讲中,我们将探讨如何将Thrift与Spring框架结合,以便于构建微服务架构。 首先,让我们了解Thrift的基本工作原理。Thrift IDL(接口定义语言)允许开发者声明服务方法和数据类型,类似于Java中的接口或C++...
### Spring与Hadoop集成知识点详解 #### 一、Spring与Hadoop集成概述 Spring与Hadoop集成是指在Spring框架中引入Hadoop的功能,利用Spring强大的依赖注入和面向切面编程能力来简化Hadoop应用程序的开发过程。通过...
海豚基于spring boot支持thrift序列化的http的微服务框架特征支持thrift序列化的http协议兼容spring ... EnableTHttpInject之后,就可以这样使用http的thriftclient @THttpInject ( backupServers = { " localhost:909
将Thrift服务集成到Spring、Docker或其他微服务架构中,以实现更复杂的系统设计。 总之,Java通过Thrift实现通信是一个涉及服务定义、代码生成、服务处理程序实现、客户端调用等多个环节的过程。Thrift的优势在于...
此框架旨在简化在 Hadoop 生态系统中的开发工作,提供了一种更加面向 Spring 的方式来处理 MapReduce 任务、HDFS 文件系统操作以及与 HBase 和 Hive 等数据存储系统的集成。 #### 二、Spring 和 Hadoop ##### 2.1 ...
项目基于 jdk1.8 采用 Spring Boot 框架 集成了分布式任务调度框架 Quartz ,任务存储于数据库。...Thrift JOB 接口调用实现了 Thrift client 池化管理。 集成了 Spring data redis,提供缓存服务。
### Hector Client Guide 知识点解析 #### 一、引言 Hector 是一个 Java 客户端库,用于简化 Apache Cassandra 的使用。它提供了一系列高级功能,如连接池管理、故障检测与恢复、基本负载均衡等,这些功能在原生的 ...
在SpringBootComposite中,ThriftServer可能是服务提供方,对外提供接口,而ThriftClient则作为服务消费方,通过调用服务器提供的接口来完成业务逻辑。 在实际项目中,`ThriftServerApplication`会先启动,注册自身...
- **RPC框架**:支持Thrift Client、Thrift Service、Dubbo Provider、Dubbo Consumer、gRPC等。 - **消息队列**:支持ActiveMQ、RabbitMQ、Kafka等。 - **数据库**:支持MySQL、Oracle、MS SQL Server(使用jTDS...