`
grzrt
  • 浏览: 188847 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Thrift连接池实现

 
阅读更多

简介

Thrift是Facebook的核心框架之一,使不同的开发语言开发的系统可以通过该框架实现彼此的通信,类似于webservice,但是Thrift提供了近乎变态的效率和开发的方便性,是webservice所不能比拟的。给分布式开发带来了极大的方便。但是这柄利器也有一些不完美。



问题

首先文档相当的少,只有一个wiki网站提供相应的帮助。这对于Thrift的推广极为不利。

其次框架本身实现有一些缺陷,就Thrift的java部分来说,没有提供连接池的支持,对RPC的调用效率有所影响。

对于文档稀少的问题,只能是通过一些Thrift的开发者和使用者多供献一些自己的心得来解决。这得需要一个过程。而连接池的问题的解决则可以快速一些。

提到池一般做过Java开发的肯定会想到ObjectPool,Apache Commons项目确实给我们的开发得来了很大的便利性,其中的pool项目正是我们实现thrift连接池的基础,当然也少不了神器spring framework。



实现

一,定义thrift连接池接口
ConnectionProvider
/*
* @(#)ConnectionProvider.java 0.1 05/11/17
*
* Copyright 2010 QISI, Inc. All rights reserved.
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package com.qidea.thrift.pool;
import org.apache.thrift.transport.TSocket;
/**
*
* @author sunwei
* @version 2010-8-6
* @since JDK1.5
*/
public interface ConnectionProvider
{
/**
* 取链接池中的一个链接
*
* @return
*/
public TSocket getConnection();
/**
* 返回链接
*
* @param socket
*/
public void returnCon(TSocket socket);
}
复制代码

二,实现连接池
GenericConnectionProvider
/*
* @(#)DefaultConnectionProviderImpl.java 0.1 05/11/17
*
* Copyright 2010 QISI, Inc. All rights reserved.
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package com.qidea.thrift.pool;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.transport.TSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
/**
*
* @author sunwei
* @version 2010-8-10
* @since JDK1.5
*/
public class GenericConnectionProvider implements ConnectionProvider,
InitializingBean, DisposableBean
{
public static final Logger logger = LoggerFactory
.getLogger(GenericConnectionProvider.class);
/** 服务的IP地址 */
private String serviceIP;
/** 服务的端口 */
private int servicePort;
/** 连接超时配置 */
private int conTimeOut;
/** 可以从缓存池中分配对象的最大数量 */
private int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE;
/** 缓存池中最大空闲对象数量 */
private int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE;
/** 缓存池中最小空闲对象数量 */
private int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE;
/** 阻塞的最大数量 */
private long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT;
/** 从缓存池中分配对象,是否执行PoolableObjectFactory.validateObject方法 */
private boolean testOnBorrow = GenericObjectPool.DEFAULT_TEST_ON_BORROW;
private boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN;
private boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE;
/** 对象缓存池 */
private ObjectPool objectPool = null;
/**
*
*/
@Override
public void afterPropertiesSet() throws Exception
{
// 对象池
objectPool = new GenericObjectPool();
//
((GenericObjectPool) objectPool).setMaxActive(maxActive);
((GenericObjectPool) objectPool).setMaxIdle(maxIdle);
((GenericObjectPool) objectPool).setMinIdle(minIdle);
((GenericObjectPool) objectPool).setMaxWait(maxWait);
((GenericObjectPool) objectPool).setTestOnBorrow(testOnBorrow);
((GenericObjectPool) objectPool).setTestOnReturn(testOnReturn);
((GenericObjectPool) objectPool).setTestWhileIdle(testWhileIdle);
((GenericObjectPool) objectPool)
.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
// 设置factory
ThriftPoolableObjectFactory thriftPoolableObjectFactory = new ThriftPoolableObjectFactory(
serviceIP, servicePort, conTimeOut);
objectPool.setFactory(thriftPoolableObjectFactory);
}
@Override
public void destroy()
{
try
{
objectPool.close();
}
catch (Exception e)
{
throw new RuntimeException("erorr destroy()", e);
}
}
@Override
public TSocket getConnection()
{
try
{
TSocket socket = (TSocket) objectPool.borrowObject();
return socket;
}
catch (Exception e)
{
throw new RuntimeException("error getConnection()", e);
}
}
@Override
public void returnCon(TSocket socket)
{
try
{
objectPool.returnObject(socket);
}
catch (Exception e)
{
throw new RuntimeException("error returnCon()", e);
}
}
public String getServiceIP()
{
return serviceIP;
}
public void setServiceIP(String serviceIP)
{
this.serviceIP = serviceIP;
}
public int getServicePort()
{
return servicePort;
}
public void setServicePort(int servicePort)
{
this.servicePort = servicePort;
}
public int getConTimeOut()
{
return conTimeOut;
}
public void setConTimeOut(int conTimeOut)
{
this.conTimeOut = conTimeOut;
}
public int getMaxActive()
{
return maxActive;
}
public void setMaxActive(int maxActive)
{
this.maxActive = maxActive;
}
public int getMaxIdle()
{
return maxIdle;
}
public void setMaxIdle(int maxIdle)
{
this.maxIdle = maxIdle;
}
public int getMinIdle()
{
return minIdle;
}
public void setMinIdle(int minIdle)
{
this.minIdle = minIdle;
}
public long getMaxWait()
{
return maxWait;
}
public void setMaxWait(long maxWait)
{
this.maxWait = maxWait;
}
public boolean isTestOnBorrow()
{
return testOnBorrow;
}
public void setTestOnBorrow(boolean testOnBorrow)
{
this.testOnBorrow = testOnBorrow;
}
public boolean isTestOnReturn()
{
return testOnReturn;
}
public void setTestOnReturn(boolean testOnReturn)
{
this.testOnReturn = testOnReturn;
}
public boolean isTestWhileIdle()
{
return testWhileIdle;
}
public void setTestWhileIdle(boolean testWhileIdle)
{
this.testWhileIdle = testWhileIdle;
}
public ObjectPool getObjectPool()
{
return objectPool;
}
public void setObjectPool(ObjectPool objectPool)
{
this.objectPool = objectPool;
}
}
复制代码
ThriftPoolableObjectFactory
/*
* @(#)ThriftPoolableObjectFactory.java 0.1 05/11/17
*
* Copyright 2010 QISI, Inc. All rights reserved.
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package com.qidea.thrift.pool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author sunwei
* @version 2010-8-10
* @since JDK1.5
*/
public class ThriftPoolableObjectFactory implements PoolableObjectFactory
{
/** 日志记录器 */
public static final Logger logger = LoggerFactory
.getLogger(ThriftPoolableObjectFactory.class);
/** 服务的IP */
private String serviceIP;
/** 服务的端口 */
private int servicePort;
/** 超时设置 */
private int timeOut;
/**
*
* @param serviceIP
* @param servicePort
* @param timeOut
*/
public ThriftPoolableObjectFactory(String serviceIP, int servicePort,
int timeOut)
{
this.serviceIP = serviceIP;
this.servicePort = servicePort;
this.timeOut = timeOut;
}
@Override
public void destroyObject(Object arg0) throws Exception
{
if (arg0 instanceof TSocket)
{
TSocket socket = (TSocket) arg0;
if (socket.isOpen())
{
socket.close();
}
}
}
/**
*
*/
@Override
public Object makeObject() throws Exception
{
try
{
TTransport transport = new TSocket(this.serviceIP,
this.servicePort, this.timeOut);
transport.open();
return transport;
}
catch (Exception e)
{
logger.error("error ThriftPoolableObjectFactory()", e);
throw new RuntimeException(e);
}
}
@Override
public boolean validateObject(Object arg0)
{
try
{
if (arg0 instanceof TSocket)
{
TSocket thriftSocket = (TSocket) arg0;
if (thriftSocket.isOpen())
{
return true;
}
else
{
return false;
}
}
else
{
return false;
}
}
catch (Exception e)
{
return false;
}
}
@Override
public void passivateObject(Object arg0) throws Exception
{
// DO NOTHING
}
@Override
public void activateObject(Object arg0) throws Exception
{
// DO NOTHING
}
public String getServiceIP()
{
return serviceIP;
}
public void setServiceIP(String serviceIP)
{
this.serviceIP = serviceIP;
}
public int getServicePort()
{
return servicePort;
}
public void setServicePort(int servicePort)
{
this.servicePort = servicePort;
}
public int getTimeOut()
{
return timeOut;
}
public void setTimeOut(int timeOut)
{
this.timeOut = timeOut;
}
}
复制代码

  三,定义连接的管理类 

ConnectionManager
/*
* @(#)ConnectionManager.java 0.1 05/11/17
*
* Copyright 2010 QISI, Inc. All rights reserved.
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package com.qidea.thrift.pool;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.thrift.transport.TSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author sunwei
* @version 2010-8-10
* @since JDK1.5
*/
public class ConnectionManager implements MethodInterceptor
{
/** 日志记录器 */
public Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
/** 保存local对象 */
ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>();
/** 连接提供池 */
public ConnectionProvider connectionProvider;
@Override
public Object invoke(MethodInvocation arg0) throws Throwable
{
TSocket socket = null;
try
{
socket = connectionProvider.getConnection();
socketThreadSafe.set(socket);
Object ret = arg0.proceed();
return ret;
}
catch (Exception e)
{
logger.error("error ConnectionManager.invoke()", e);
throw new Exception(e);
}
finally
{
connectionProvider.returnCon(socket);
socketThreadSafe.remove();
}
}
/**
* 取socket
*
* @return
*/
public TSocket getSocket()
{
return socketThreadSafe.get();
}
public ConnectionProvider getConnectionProvider()
{
return connectionProvider;
}
public void setConnectionProvider(ConnectionProvider connectionProvider)
{
this.connectionProvider = connectionProvider;
}
}
复制代码

四,定义spring配置,对受管的bean提供thrift连接
Thrift连接池spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:flex="http://www.springframework.org/schema/flex" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd">

<!-- thrift连接池配置 -->
<bean id="connectionProvider" class="com.qidea.thrift.pool.GenericConnectionProvider">
<property name="serviceIP" value="localhost" />
<property name="servicePort" value="9090" />
<property name="maxActive" value="10" />
<property name="maxIdle" value="10" />
<property name="testOnBorrow" value="true" />
<property name="testOnReturn" value="true" />
<property name="testWhileIdle" value="true" />
<property name="conTimeOut" value="2000" />
</bean>
<!-- thrift连接管理配置 -->
<bean id="connectionManager" class="com.qidea.thrift.pool.ConnectionManager">
<property name="connectionProvider" ref="connectionProvider" />
</bean>
<!-- 客户端接口配置 -->
<bean class="com.qidea.pushserver.rpc.client.PushServiceClient">
<property name="connectionManager" ref="connectionManager" />
</bean>
<!-- thrift连接AOP配置 -->
<aop:config proxy-target-class="true">
<aop:pointcut id="clientMethods"
expression="execution(* com.qidea.pushserver.rpc.client.*.*(..))" />
<aop:advisor advice-ref="connectionManager" pointcut-ref="clientMethods" />
</aop:config>
</beans>
复制代码

五,使用连接池
PushRPCClient
/*
* @(#)PushRPCClient.java 0.1 05/11/17
*
* Copyright 2010 QISI, Inc. All rights reserved.
* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package com.qidea.pushserver.rpc;
import java.util.ArrayList;
import java.util.List;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.qidea.pushserver.ServiceException;
import com.qidea.thrift.pool.ConnectionManager;
/**
*
* @author sunwei
* @version 2010-8-11
* @since JDK1.5
*/
public class PushRPCClient
{
public static Logger logger = LoggerFactory.getLogger(PushRPCClient.class);
private ConnectionManager connectionManager;
/**
* 取在线玩家列表
*
* @param roleIdList
* @return
* @throws ServiceException
*/
public List<Long> getOnLineRoleIdList(List<Long> roleIdList)
{
TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());
PushRPCService.Client client = new PushRPCService.Client(protocol);
try
{
List<Long> onLineIdList = client.getOnLineRoleIdList(roleIdList);
return onLineIdList;
}
catch (TException e)
{
logger.error("error getOnLineRoleIdList()", e);
}
return new ArrayList<Long>();
}
/**
* 解散联盟
*
* @param allianceId
*/
public void dismissAlliance(long allianceId)
{
TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());
PushRPCService.Client client = new PushRPCService.Client(protocol);
try
{
client.dismissAlliance(allianceId);
}
catch (TException e)
{
logger.error("error dismissAlliance()", e);
}
}
/**
* 加入联盟
*
* @param roleId
* @param allianceId
*/
public void joinAlliance(long roleId, long allianceId)
{
TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());
PushRPCService.Client client = new PushRPCService.Client(protocol);
try
{
client.joinAlliance(roleId, allianceId);
}
catch (TException e)
{
logger.error("error joinAlliance()", e);
}
}
/**
* 解散联盟
*
* @param roleId
* @param allianceId
*/
public void getOutAlliance(long roleId, long allianceId)
{
TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());
PushRPCService.Client client = new PushRPCService.Client(protocol);
try
{
client.getOutAlliance(roleId, allianceId);
}
catch (Exception e)
{
logger.error("error getOutAlliance()", e);
}
}
public ConnectionManager getConnectionManager()
{
return connectionManager;
}
public void setConnectionManager(ConnectionManager connectionManager)
{
this.connectionManager = connectionManager;
}
}

分享到:
评论

相关推荐

    thriftpool:Thrift RPC连接池,java实现,基于apache commons pool

    标题中的“thriftpool”是一个专门为Thrift RPC(Remote Procedure Call)服务设计的连接池实现,它基于Apache Commons Pool库。Apache Thrift是一种软件框架,用于构建跨语言的服务,它允许不同编程语言之间高效地...

    基于thrift的rpc框架,在thrift基础上增加负载均衡,连接池,性能监控.zip

    Java中有很多成熟的连接池实现,如HikariCP、Apache Commons Pool等,这些库可以集成到Thrift客户端,确保高效地复用连接,提高服务性能。 最后,性能监控对于理解系统的运行状态至关重要。在Thrift服务中,我们...

    thrift服务集成spring及连接池

    【Thrift服务集成Spring及连接池】的知识点详解 Thrift是一个开源的跨语言服务框架,由Facebook在2007年创建并贡献给了Apache基金会。它的主要目标是解决系统间的大数据量通信问题,同时支持多语言环境下的跨平台...

    thrift example

    4. **Conf**:这通常指的是配置文件或配置管理,它包含了运行Thrift服务器所需的参数,如端口号、服务名、连接池大小等。良好的配置管理使得调整服务行为和适应不同环境变得容易。 5. **Async Logger**:异步日志...

    thrift-connection-pool

    使用 Apache commons-pool2 的 Thrift 连接池的实现。 此实现创建 TSocket 的实例并将它们池化。 以下代码片段显示了如何创建 TSocketProvider 的实例 GenericObjectPoolConfig config = new ...

    thrift 教程

    5. **连接池与连接重试**:在“thrifttest”这个demo中,可能包含了一个实现连接池和连接重试机制的示例。连接池用于复用已建立的连接,减少新连接创建的开销,提高系统性能。而连接重试机制则是在网络不稳定时,为...

    阿里云thrift2连接hbasedemo.zip

    这些是项目中的C#源代码文件,分别实现了HBase数据解析器、工作信息类、Thrift连接池、字节转换工具、HBase表的接口以及Thrift配置类。它们是与HBase交互的核心组件。 8. **Program.cs**: 这是.NET程序的主入口点...

    thrift_connector:节俭的客户,利用连接池

    简单的连接池可以节俭。 thrift_connector既可以用于本机thrift客户端,也可以用于thriftpy客户端,但用法差别不大。 可以在示例以及以下各节中找到示例。 这个怎么运作 它维护一个连接池,并且将进行预检查(调用...

    Laravel开发-thrift-hbase-laravel

    而Thrift则是一个跨语言的服务框架,最初由Facebook开发,现在是Apache的一个顶级项目。...在实际应用中,可能还需要处理更多的细节,如错误处理、连接池管理等,但上述步骤构成了基础的集成流程。

    采用java操作thrift代码示例

    在实际项目中,可能需要处理更复杂的场景,如异常处理、连接池管理、多线程等。Thrift还提供了多种传输方式(如HTTP、HTTPS、NIO等)和协议(如Binary、Compact等),可以根据具体需求进行选择。 在Java中操作...

    thrift-0.11.0

    在实际开发中,Thrift常用于构建分布式系统中的服务通信,如数据库连接池、缓存服务、消息队列等。它的跨语言特性使得不同团队可以使用自己喜欢的编程语言进行开发,而无需担心通信问题。此外,Thrift的高效和轻量级...

    Thrift双向通讯java代码

    在实际应用中,你可能还需要处理线程池、异步调用、异常处理、连接池等高级特性。Thrift还支持HTTP、HTTPS、Zookeeper等多种传输协议和调度策略,可以根据具体需求进行选择和配置。 标签"Thrift java"表明我们专注...

    Thrift结构分析及增加取客户端IP功能实现

    分析Thrift的结构动机是为了实现服务端能取到客户端的IP,因此需要对它的结构、调用流程有些了解。... thrift对网络连接没有使用内存池,最直接简单的性能优化是绑定Google gperftools中的TCMalloc。

    基于thrift实现的php和java的rpc框架.zip

    此外,还可以通过调整缓冲区大小、连接池等方式进一步优化性能。 9. **测试与调试**:在实际部署前,需要对PHP客户端和服务端Java服务进行充分的测试,确保所有功能正常工作。Thrift提供了丰富的工具和文档来帮助...

    Thrift实战案例

    - **优化实践**:通过阅读源码,可以发现并优化性能瓶颈,如缓存策略、连接池管理等。 9. **集成到Maven项目** - **添加依赖**:在Maven的pom.xml文件中添加Thrift的Maven插件和相应的库依赖。 - **编译与构建**...

    thrift-zookeeper-rpc

    2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。 3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知...

    竞赛资料源码- 基于thrift的rpc框架,在thrift基础上增加负载均衡,连接池,性能监控,通过动.zip

    【目标受众】: 本项目适合IT相关专业各种计算机技术的源代码和项目资料,如计科、人工智能、通信工程、自动化和电子信息等的在校学生、老师或者企业员工下载使用。 也适合小白学习进阶,可以用作比赛项目、可以进行...

    VS2017_Hbase thrift C++接口测试

    为了确保良好的性能和健壮性,还应考虑异步调用、连接池和重试策略。 在运行测试时,确保HBase服务已经启动,并且Thrift服务器也在运行。测试应覆盖基本的CRUD操作,以及更复杂的情景,如扫描、过滤等。这有助于...

    thrift的各种服务和各种源代码

    thrift连接池 2.thrift-service工程 基于thrift的微服务框架 thrift不仅支持tcp/ip协议的rpc调用,也支持http协议的rest服务调用,同一个项目中甚至可同时支持这二种方式 2.1支持rpc调用 支持常规的tcp/ip协议的...

    java大数据作业_8Redis、Thrift、Hadoop2

    这段代码创建了一个最大连接数为100的连接池,并且设置了最大空闲连接数和最小空闲连接数。 #### 7. 使用jedis向队列b发布值a 使用Jedis发布消息到Redis的队列中可以通过以下方式实现: ```java import redis....

Global site tag (gtag.js) - Google Analytics